[GitHub] [kafka] PrasanthV454 opened a new pull request, #13922: [MINOR] remove the currentStream.close() statement causing exit code issue
PrasanthV454 opened a new pull request, #13922: URL: https://github.com/apache/kafka/pull/13922 currentStream shouldn't be closed as it is std err or std out. Only tempStream should be closed. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] github-actions[bot] commented on pull request #13376: KAFKA-14091: Leader proactively aborting tasks from lost workers in rebalance in EOS mode
github-actions[bot] commented on PR #13376: URL: https://github.com/apache/kafka/pull/13376#issuecomment-1610635915 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] github-actions[bot] commented on pull request #13478: KAFKA-14870: Fix KerberosLogin#relogin to invoke super#login when cre…
github-actions[bot] commented on PR #13478: URL: https://github.com/apache/kafka/pull/13478#issuecomment-1610635895 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] flashmouse commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1610578143 @kirktrue thank you for reply! In your case, although ``isBalanced`` return false, ``performReassignments`` still won't reassign any partition, because it would only do reassign when ``currentAssignment.get(consumer).size() > currentAssignment.get(otherConsumer).size() + 1``, it will just loop all partitions in reassignable list and find no one match it so ``modified`` keep ``false`` then return done. I think the problem is ``isBalanced`` and ``performReassignments`` should have the same predict logic when they judge whether should do reassign. for now ``performReassignments`` itself think should do reassign only when 2 consumers that their partition num gap should have at least 2 but ``isBalanced`` is 1. I mean with such situation, I think if we not modify ``isBalanced``, the reassignment logic is still right, but may very slow. ``performReassignments`` will do quite a lot unnecessary work, as I said above, it could done finally but cost a long time. you can try unit test ``testLargeAssignmentAndGroupWithNonEqualSubscription`` without this pr change, if set ``partitionCount``= 20, ``consumerCount`` = 2, it could done with success, but if set ``partitionCount``= 200, ``consumerCount`` = 20, it may run very long time. I have run the test and no result after 14min, and it could speed up to 47sec after I apply my change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244508147 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -423,6 +456,47 @@ public Map computeSubscriptionMetadata( return Collections.unmodifiableMap(newSubscriptionMetadata); } +/** + * Updates the next metadata refresh time. + * + * @param nextTimeMs The next time in milliseconds. + * @param groupEpoch The associated group epoch. + */ +public void setNextMetadataRefreshTime( +long nextTimeMs, +int groupEpoch +) { +this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, groupEpoch); +} + +/** + * Resets the next metadata refresh. + */ +public void resetNextMetadataRefreshTime() { +this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY; +} + +/** + * Checks if a metadata refresh is required. A refresh is required in two cases: + * 1) The next update time is smaller or equals to the current time; + * 2) The group epoch associated with the next update time is smaller than Review Comment: This is also the case when we reset `nextMetadataRefreshTime` right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244508147 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -423,6 +456,47 @@ public Map computeSubscriptionMetadata( return Collections.unmodifiableMap(newSubscriptionMetadata); } +/** + * Updates the next metadata refresh time. + * + * @param nextTimeMs The next time in milliseconds. + * @param groupEpoch The associated group epoch. + */ +public void setNextMetadataRefreshTime( +long nextTimeMs, +int groupEpoch +) { +this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, groupEpoch); +} + +/** + * Resets the next metadata refresh. + */ +public void resetNextMetadataRefreshTime() { +this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY; +} + +/** + * Checks if a metadata refresh is required. A refresh is required in two cases: + * 1) The next update time is smaller or equals to the current time; + * 2) The group epoch associated with the next update time is smaller than Review Comment: This is also the case when we reset the refreshMetadata time right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244507539 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -423,6 +456,47 @@ public Map computeSubscriptionMetadata( return Collections.unmodifiableMap(newSubscriptionMetadata); } +/** + * Updates the next metadata refresh time. + * + * @param nextTimeMs The next time in milliseconds. + * @param groupEpoch The associated group epoch. + */ +public void setNextMetadataRefreshTime( +long nextTimeMs, +int groupEpoch +) { +this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, groupEpoch); +} + +/** + * Resets the next metadata refresh. Review Comment: This means we should update immediately? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244506236 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -119,6 +131,18 @@ public String toString() { */ private final TimelineHashMap> currentPartitionEpoch; +/** + * The next metadata refresh time. It consists of a timestamp in milliseconds together with + * the group epoch at the time of setting it. The metadata refresh time is considered as a + * soft state (read that it is not stored in a timeline data structure). It is like this + * because it is not persisted to the log. The group epoch is here to ensure that the + * next metadata refresh time is invalidated if the group epoch does not correspond to + * the current group epoch. This can happen if the next metadata refresh time is updated + * after having refreshed the metadata but the write operation failed. In this case, the + * time is not automatically rollback. Review Comment: nit: automatically rolled back. Also in this case, do we not update the refresh time until the epoch bump and write succeed? And we will keep refreshing the metadata in the meantime? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244500812 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1021,34 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ +public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { +metadataImage = newImage; + +// Notify all the groups subscribed to the created, updated or Review Comment: I guess my question then is what is the flow for updating the groups with the image? This will just happen on the next heartbeat since we set metadataImage to new image? We don't really do any notifying besides resetting the refresh timer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244497307 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1021,34 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ +public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { +metadataImage = newImage; + +// Notify all the groups subscribed to the created, updated or Review Comment: As mentioned before, I got confused and thought this was actually adding topics. If this is just a call to update the existing groups with these topics, this is fine. (Since new topics won't be part of groups yet) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244497307 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1021,34 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ +public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { +metadataImage = newImage; + +// Notify all the groups subscribed to the created, updated or Review Comment: As mentioned before, I got confused and thought this was actually adding topics. If this is just a call to update the existing groups with these topics, this is fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244497307 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1021,34 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ +public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { +metadataImage = newImage; + +// Notify all the groups subscribed to the created, updated or Review Comment: I took a longer look at the code -- and assuming changedTopics includes the newly created topics, this should be fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244496874 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1021,34 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ +public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { +metadataImage = newImage; + +// Notify all the groups subscribed to the created, updated or +// deleted topics. +Set allGroupIds = new HashSet<>(); +delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> { +String topicName = topicDelta.name(); +Set groupIds = groupsByTopics.get(topicName); +if (groupIds != null) allGroupIds.addAll(groupIds); +}); +delta.topicsDelta().deletedTopicIds().forEach(topicId -> { +TopicImage topicImage = delta.image().topics().getTopic(topicId); +Set groupIds = groupsByTopics.get(topicImage.name()); +if (groupIds != null) allGroupIds.addAll(groupIds); Review Comment: Oh i misunderstood -- this is to add to the list of groups to notify. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244495117 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1021,34 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ +public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { +metadataImage = newImage; + +// Notify all the groups subscribed to the created, updated or Review Comment: Is created topics a different method in topicsDelta? Shouldn't we have `createdTopicIds` and we add them? Or is changedTopics accounting for that? Are there other changes besides topic creation we can have? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor
lihaosky commented on code in PR #13851: URL: https://github.com/apache/kafka/pull/13851#discussion_r1244450143 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java: ## @@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig, return brokerSideConfigEntry.value(); } +public Map> getTopicPartitionInfo(final Set topics) { +log.debug("Starting to describe topics {} in partition assignor.", topics); + +long currentWallClockMs = time.milliseconds(); +final long deadlineMs = currentWallClockMs + retryTimeoutMs; + +Set topicsToDescribe = new HashSet<>(topics); +final Map> topicPartitionInfo = new HashMap<>(); + +while (!topicsToDescribe.isEmpty()) { +final Map> existed = getTopicPartitionInfo(topicsToDescribe, null); +topicPartitionInfo.putAll(existed); +topicsToDescribe.removeAll(topicPartitionInfo.keySet()); +if (!topicsToDescribe.isEmpty()) { +currentWallClockMs = time.milliseconds(); + +if (currentWallClockMs >= deadlineMs) { +final String timeoutError = String.format( +"Could not create topics within %d milliseconds. " + +"This can happen if the Kafka cluster is temporarily not available.", +retryTimeoutMs); +log.error(timeoutError); Review Comment: Is this mimicking the `makeReady` function logs. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java: ## @@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig, return brokerSideConfigEntry.value(); } +public Map> getTopicPartitionInfo(final Set topics) { +log.debug("Starting to describe topics {} in partition assignor.", topics); + +long currentWallClockMs = time.milliseconds(); +final long deadlineMs = currentWallClockMs + retryTimeoutMs; + +Set topicsToDescribe = new HashSet<>(topics); +final Map> topicPartitionInfo = new HashMap<>(); + +while (!topicsToDescribe.isEmpty()) { +final Map> existed = getTopicPartitionInfo(topicsToDescribe, null); +topicPartitionInfo.putAll(existed); +topicsToDescribe.removeAll(topicPartitionInfo.keySet()); +if (!topicsToDescribe.isEmpty()) { +currentWallClockMs = time.milliseconds(); + +if (currentWallClockMs >= deadlineMs) { +final String timeoutError = String.format( +"Could not create topics within %d milliseconds. " + +"This can happen if the Kafka cluster is temporarily not available.", +retryTimeoutMs); +log.error(timeoutError); +throw new TimeoutException(timeoutError); +} +log.info( Review Comment: Is this mimicking the `makeReady` function logs. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java: ## @@ -0,0 +1,164 @@ +package org.apache.kafka.streams.processor.internals; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RackAwareTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class); + +private final Cluster fullMetadata; +private final Map> partitionsForTask; +private final Map>> processRacks; +private final AssignmentConfigs assignmentConfigs; +private final Map> racksForPartition; +private final InternalTopicManager internalTopicManager; +private Boolean canEnableForActive; + +public RackAwareTaskAssignor(final Cluster fullMetadata, + final Map> partitionsForTask, + final Map> tasksForTopicGroup, + final Map>> processRacks, + final InternalTopicManager
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244495117 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1021,34 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ +public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { +metadataImage = newImage; + +// Notify all the groups subscribed to the created, updated or Review Comment: Is created topics a different method in topicsDelta? Shouldn't we have `createdTopicIds` and we add them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244492766 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1021,34 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ +public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { +metadataImage = newImage; + +// Notify all the groups subscribed to the created, updated or +// deleted topics. +Set allGroupIds = new HashSet<>(); +delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> { +String topicName = topicDelta.name(); +Set groupIds = groupsByTopics.get(topicName); +if (groupIds != null) allGroupIds.addAll(groupIds); +}); +delta.topicsDelta().deletedTopicIds().forEach(topicId -> { +TopicImage topicImage = delta.image().topics().getTopic(topicId); +Set groupIds = groupsByTopics.get(topicImage.name()); +if (groupIds != null) allGroupIds.addAll(groupIds); Review Comment: we don't want to add all the deleted topics do we? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244485648 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -727,6 +800,80 @@ public void replay( + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone."); } consumerGroup.removeMember(memberId); +updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames()); +} +} + +/** + * @return The set of groups subscribed to the topic. + */ +public Set groupsSubscribedToTopic(String topicName) { +Set groups = groupsByTopics.get(topicName); +return groups != null ? groups : Collections.emptySet(); +} + +/** + * Subscribes a group to a topic. + * + * @param groupId The group id. + * @param topicName The topic name. + */ +private void subscribeGroupToTopic( +String groupId, +String topicName +) { +groupsByTopics +.computeIfAbsent(topicName, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) +.add(groupId); +} + +/** + * Unsubscribes a group from a topic. + * + * @param groupId The group id. + * @param topicName The topic name. + */ +private void unsubscribeGroupFromTopic( Review Comment: should return be groupsIds type here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244485648 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -727,6 +800,80 @@ public void replay( + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone."); } consumerGroup.removeMember(memberId); +updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames()); +} +} + +/** + * @return The set of groups subscribed to the topic. + */ +public Set groupsSubscribedToTopic(String topicName) { +Set groups = groupsByTopics.get(topicName); +return groups != null ? groups : Collections.emptySet(); +} + +/** + * Subscribes a group to a topic. + * + * @param groupId The group id. + * @param topicName The topic name. + */ +private void subscribeGroupToTopic( +String groupId, +String topicName +) { +groupsByTopics +.computeIfAbsent(topicName, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) +.add(groupId); +} + +/** + * Unsubscribes a group from a topic. + * + * @param groupId The group id. + * @param topicName The topic name. + */ +private void unsubscribeGroupFromTopic( Review Comment: should return be boolean here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244482511 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -709,14 +780,16 @@ public void replay( String groupId = key.groupId(); String memberId = key.memberId(); +ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, value != null); +Set oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames()); + if (value != null) { -ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true); ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true); consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember) .updateWith(value) .build()); +updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames()); Review Comment: could this and line 803 be outside the if/else? I'm just curious why we moved consumerGroup and old SubscribedTopicNames out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13446: KAFKA-14837, KAFKA-14842: Ignore groups that do not have offsets for filtered topics in MirrorCheckpointConnector
C0urante commented on code in PR #13446: URL: https://github.com/apache/kafka/pull/13446#discussion_r1244478202 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -150,10 +156,31 @@ private void loadInitialConsumerGroups() List findConsumerGroups() throws InterruptedException, ExecutionException { -return listConsumerGroups().stream() +List filteredGroups = listConsumerGroups().stream() .map(ConsumerGroupListing::groupId) -.filter(this::shouldReplicate) +.filter(this::shouldReplicateByGroupFilter) .collect(Collectors.toList()); + +List checkpointGroups = new LinkedList<>(); +List irrelevantGroups = new LinkedList<>(); + +for (String group : filteredGroups) { +Set consumedTopics = listConsumerGroupOffsets(group).keySet().stream() +.map(TopicPartition::topic) +.filter(this::shouldReplicateByTopicFilter) Review Comment: I don't believe MM2 ever supported that. From [KIP-382](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP382:MirrorMaker2.0-InternalTopics), regarding checkpoints (emphasis mine): > The connector will periodically query the source cluster for all committed offsets from all consumer groups, **filter for those topics being replicated**, and emit a message to a topic IIUC, MM2 also performs offset syncing based on the contents of the offset syncs topic, which is only populated by the source connector (i.e., the connector that replicates topics). @blacktooth have you experienced otherwise? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244477507 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -88,10 +93,12 @@ public class GroupMetadataManager { public static class Builder { private LogContext logContext = null; private SnapshotRegistry snapshotRegistry = null; +private Time time = null; private List assignors = null; -private TopicsImage topicsImage = null; Review Comment: Did we change this to MetadataImage because it was easier to pass in that class? Or do we need features in the metadata image that were not in the topic image? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244475836 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -506,32 +555,54 @@ private CoordinatorResult consumerGr .setClientHost(clientHost) .build(); +boolean updatedMemberSubscriptions = false; if (!updatedMember.equals(member)) { records.add(newMemberSubscriptionRecord(groupId, updatedMember)); if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) { log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " + updatedMember.subscribedTopicNames()); +updatedMemberSubscriptions = true; +} -subscriptionMetadata = group.computeSubscriptionMetadata( -member, -updatedMember, -topicsImage -); - -if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " -+ subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); -} +if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) { +log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed regex to: " + +updatedMember.subscribedTopicRegex()); +updatedMemberSubscriptions = true; +} +} -groupEpoch += 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +long currentTimeMs = time.milliseconds(); +boolean maybeUpdateMetadata = updatedMemberSubscriptions || group.refreshMetadataNeeded(currentTimeMs); +boolean updatedSubscriptionMetadata = false; +if (maybeUpdateMetadata) { +subscriptionMetadata = group.computeSubscriptionMetadata( +member, +updatedMember, +metadataImage.topics() +); -log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + "."); +if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { +log.info("[GroupId " + groupId + "] Computed new subscription metadata: " ++ subscriptionMetadata + "."); +records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +updatedSubscriptionMetadata = true; } } +if (updatedMemberSubscriptions || updatedSubscriptionMetadata) { +groupEpoch += 1; +records.add(newGroupEpochRecord(groupId, groupEpoch)); +log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + "."); +} + +if (maybeUpdateMetadata) { Review Comment: While I am able to follow the logic and understand it is probably the best way to avoid duplicate code, I do wonder if there is a less confusing way to express this without the similar booleans. It may not be possible, but maybe we can add some comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244469064 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -179,26 +209,45 @@ GroupMetadataManager build() { private final int consumerGroupHeartbeatIntervalMs; /** - * The topics metadata (or image). + * The metadata refresh interval. */ -private TopicsImage topicsImage; +private final int consumerGroupMetadataRefreshIntervalMs; + +/** + * The metadata image. + * + * Package private for testing. Review Comment: this is just private right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on PR #13880: URL: https://github.com/apache/kafka/pull/13880#issuecomment-1610300192 Looks pretty good. I think if we want to do this as part the the PR, there's just this left: https://github.com/apache/kafka/pull/13880/files#r1244223827 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on PR #13798: URL: https://github.com/apache/kafka/pull/13798#issuecomment-1610267331 I ran some tests with producer-perf. I didn't see noticeable differences, but the tests were not particularly long. I can run more if needed. The `--transaction-duration` argument tells how many milliseconds before we call commit. Also note that really only one partition was produced to, so only that one verification per transaction will trigger the metric. ``` bin/kafka-producer-perf-test.sh --transaction-duration-ms 1000 --record-size 1000 --throughput -1 --num-records 100 --topic test-topic --producer.config config/producer.properties TRUNK 100 records sent, 215842.866393 records/sec (205.84 MB/sec), 16.21 ms avg latency, 153.00 ms max latency, 0 ms 50th, 119 ms 95th, 147 ms 99th, 152 ms 99.9th. 100 records sent, 223513.634332 records/sec (213.16 MB/sec), 19.28 ms avg latency, 139.00 ms max latency, 1 ms 50th, 118 ms 95th, 133 ms 99th, 138 ms 99.9th. 100 records sent, 214638.334407 records/sec (204.70 MB/sec), 19.43 ms avg latency, 134.00 ms max latency, 1 ms 50th, 123 ms 95th, 131 ms 99th, 133 ms 99.9th. KAFKA-15028 100 records sent, 217485.863419 records/sec (207.41 MB/sec), 17.88 ms avg latency, 151.00 ms max latency, 0 ms 50th, 128 ms 95th, 145 ms 99th, 149 ms 99.9th. 100 records sent, 229568.411387 records/sec (218.93 MB/sec), 17.15 ms avg latency, 137.00 ms max latency, 0 ms 50th, 118 ms 95th, 130 ms 99th, 136 ms 99.9th. 100 records sent, 220653.133274 records/sec (210.43 MB/sec), 16.47 ms avg latency, 134.00 ms max latency, 1 ms 50th, 116 ms 95th, 128 ms 99th, 133 ms 99.9th. bin/kafka-producer-perf-test.sh --transaction-duration-ms 300 --record-size 1000 --throughput -1 --num-records 100 --topic test-topic --producer.config config/producer.properties TRUNK 100 records sent, 213812.272824 records/sec (203.91 MB/sec), 15.79 ms avg latency, 142.00 ms max latency, 1 ms 50th, 101 ms 95th, 136 ms 99th, 142 ms 99.9th. 100 records sent, 213174.163291 records/sec (203.30 MB/sec), 13.00 ms avg latency, 121.00 ms max latency, 1 ms 50th, 96 ms 95th, 118 ms 99th, 120 ms 99.9th. 100 records sent, 225580.870742 records/sec (215.13 MB/sec), 12.45 ms avg latency, 128.00 ms max latency, 1 ms 50th, 101 ms 95th, 123 ms 99th, 127 ms 99.9th. KAFKA-15028 100 records sent, 218531.468531 records/sec (208.41 MB/sec), 11.97 ms avg latency, 69.00 ms max latency, 1 ms 50th, 52 ms 95th, 65 ms 99th, 68 ms 99.9th. 100 records sent, 217864.923747 records/sec (207.77 MB/sec), 13.21 ms avg latency, 119.00 ms max latency, 1 ms 50th, 103 ms 95th, 117 ms 99th, 119 ms 99.9th. 100 records sent, 214868.929953 records/sec (204.91 MB/sec), 13.07 ms avg latency, 118.00 ms max latency, 1 ms 50th, 94 ms 95th, 115 ms 99th, 117 ms 99.9th. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] blacktooth commented on a diff in pull request #13446: KAFKA-14837, KAFKA-14842: Ignore groups that do not have offsets for filtered topics in MirrorCheckpointConnector
blacktooth commented on code in PR #13446: URL: https://github.com/apache/kafka/pull/13446#discussion_r1244382978 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -150,10 +156,31 @@ private void loadInitialConsumerGroups() List findConsumerGroups() throws InterruptedException, ExecutionException { -return listConsumerGroups().stream() +List filteredGroups = listConsumerGroups().stream() .map(ConsumerGroupListing::groupId) -.filter(this::shouldReplicate) +.filter(this::shouldReplicateByGroupFilter) .collect(Collectors.toList()); + +List checkpointGroups = new LinkedList<>(); +List irrelevantGroups = new LinkedList<>(); + +for (String group : filteredGroups) { +Set consumedTopics = listConsumerGroupOffsets(group).keySet().stream() +.map(TopicPartition::topic) +.filter(this::shouldReplicateByTopicFilter) Review Comment: Is this backward compatible? Can this break use-cases where user is using MM2 to sync the offsets but not necessarily topics? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15028) AddPartitionsToTxnManager metrics
[ https://issues.apache.org/jira/browse/KAFKA-15028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15028: - Attachment: latency-cpu.html > AddPartitionsToTxnManager metrics > - > > Key: KAFKA-15028 > URL: https://issues.apache.org/jira/browse/KAFKA-15028 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Attachments: latency-cpu.html > > > KIP-890 added metrics for the AddPartitionsToTxnManager > VerificationTimeMs – number of milliseconds from adding partition info to the > manager to the time the response is sent. This will include the round trip to > the transaction coordinator if it is called. This will also account for > verifications that fail before the coordinator is called. > VerificationFailureRate – rate of verifications that returned in failure > either from the AddPartitionsToTxn response or through errors in the manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on PR #13798: URL: https://github.com/apache/kafka/pull/13798#issuecomment-1610196155 Thanks for sharing this flame graph. I see that the histogram takes up the majority of the processCompletedSends, but compared to the total cpu usage, it's about 1%. I still think it is worth considering though, so I will look at perf and consider the debug level metric. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1244331034 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -17,25 +17,37 @@ package kafka.server +import kafka.server.AddPartitionsToTxnManager.{verificationFailureRateMetricName, verificationTimeMsMetricName} import kafka.utils.Logging import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} import java.util +import java.util.concurrent.TimeUnit import scala.collection.mutable object AddPartitionsToTxnManager { type AppendCallback = Map[TopicPartition, Errors] => Unit + + val verificationFailureRateMetricName = "VerificationFailureRate" Review Comment: Oh hmm -- so we would have to predefine or lazily create the meters with the various error messages. I'm wondering if this is still necessary. We should be able to figure out the errors from either the api error metrics or from the logging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaocairush commented on a diff in pull request #13884: MINOR: fix typos for client
xiaocairush commented on code in PR #13884: URL: https://github.com/apache/kafka/pull/13884#discussion_r1244247840 ## clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java: ## @@ -854,7 +854,7 @@ public void shouldThrowOnInvalidDateFormatOrNullTimestamp() { private void checkExceptionForGetDateTimeMethod(Executable executable) { assertTrue(assertThrows(ParseException.class, executable) -.getMessage().contains("Unparseable date")); +.getMessage().contains("Unparsable date")); Review Comment: fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying
[ https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14945: - Labels: kip (was: ) > Add Serializer#serializeToByteBuffer() to reduce memory copying > --- > > Key: KAFKA-14945 > URL: https://issues.apache.org/jira/browse/KAFKA-14945 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: LinShunkang >Assignee: LinShunkang >Priority: Major > Labels: kip > Fix For: 3.6.0 > > > JIAR for KIP-872: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying
[ https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14945: - Fix Version/s: 3.6.0 > Add Serializer#serializeToByteBuffer() to reduce memory copying > --- > > Key: KAFKA-14945 > URL: https://issues.apache.org/jira/browse/KAFKA-14945 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: LinShunkang >Assignee: LinShunkang >Priority: Major > Fix For: 3.6.0 > > > JIAR for KIP-872: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on pull request #13817: KAFKA-15062: Adding ppc64le build stage
divijvaidya commented on PR #13817: URL: https://github.com/apache/kafka/pull/13817#issuecomment-1610089621 Hey @Vaibhav-Nazare A KIP needs at least 3 committer votes and I believe we haven't heard from other folks in the community on the KIP. I am waiting for others to chime in. Otherwise, we can start a vote in 2 weeks (and hope that the KIP gets at least 3 votes). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on pull request #13903: MINOR: Bump requests from 2.24.0 to 2.31.0 in /tests
machi1990 commented on PR #13903: URL: https://github.com/apache/kafka/pull/13903#issuecomment-1610089280 Thank you @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dependabot[bot] commented on pull request #13743: Bump requests from 2.24.0 to 2.31.0 in /tests
dependabot[bot] commented on PR #13743: URL: https://github.com/apache/kafka/pull/13743#issuecomment-1610085437 OK, I won't notify you again about this release, but will get in touch when a new version is available. If you'd rather skip all updates until the next major or minor version, let me know by commenting `@dependabot ignore this major version` or `@dependabot ignore this minor version`. If you change your mind, just re-open this PR and I'll resolve any conflicts on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya closed pull request #13743: Bump requests from 2.24.0 to 2.31.0 in /tests
divijvaidya closed pull request #13743: Bump requests from 2.24.0 to 2.31.0 in /tests URL: https://github.com/apache/kafka/pull/13743 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya merged pull request #13903: MINOR: Bump requests from 2.24.0 to 2.31.0 in /tests
divijvaidya merged PR #13903: URL: https://github.com/apache/kafka/pull/13903 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1244223827 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {{@link Record}}. + */ +public class RecordSerde implements PartitionWriter.Serializer, CoordinatorLoader.Deserializer { +@Override +public byte[] serializeKey(Record record) { +// Record does not accept a null key. +return MessageUtil.toVersionPrefixedBytes( +record.key().version(), +record.key().message() +); +} + +@Override +public byte[] serializeValue(Record record) { +// Tombstone is represented with a null value. +if (record.value() == null) { +return null; +} else { +return MessageUtil.toVersionPrefixedBytes( +record.value().version(), +record.value().message() +); +} +} + +@Override +public Record deserialize( +ByteBuffer keyBuffer, +ByteBuffer valueBuffer +) throws RuntimeException { +final short recordType = readVersion(keyBuffer, "key"); +final ApiMessage keyMessage = apiMessageKeyFor(recordType); +readMessage(keyMessage, keyBuffer, recordType, "key"); + +if (valueBuffer == null) { +return new Record(new ApiMessageAndVersion(keyMessage, recordType), null); +} + +final ApiMessage valueMessage = apiMessageValueFor(recordType); +final short valueVersion = readVersion(valueBuffer, "value"); Review Comment: I will add a description of the format in the javadoc of the class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)
vcrfxia commented on code in PR #13855: URL: https://github.com/apache/kafka/pull/13855#discussion_r1239143225 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ## @@ -112,6 +125,74 @@ private void pushNullValueToTable() { } } + +private void makeJoin(final Duration grace) { +final KStream stream; +final KTable table; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +builder = new StreamsBuilder(); + +final Consumed consumed = Consumed.with(Serdes.Integer(), Serdes.String()); +stream = builder.stream(streamTopic, consumed); +table = builder.table("tableTopic2", consumed, Materialized.as( +Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5; +stream.join(table, +MockValueJoiner.TOSTRING_JOINER, +Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace) +).process(supplier); +final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); +driver = new TopologyTestDriver(builder.build(), props); +inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +inputTableTopic = driver.createInputTopic("tableTopic2", new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + +processor = supplier.theCapturedProcessor(); +} + +@Test +public void shouldFailIfTableIsNotVersioned() { +final StreamsBuilder builder = new StreamsBuilder(); +final Properties props = new Properties(); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); +final KStream streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); +final KTable tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String())); + +final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, +() -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one")); +assertThat( +exception.getMessage(), +is("KTable must be versioned to use a grace period in a stream table join.") +); +} + +@Test +public void shouldDelayJoinByGracePeriod() { +makeJoin(Duration.ofMillis(2)); + +// push four items to the table. this should not produce any item. +pushToTableNonRandom(4, "Y"); +processor.checkAndClearProcessResult(EMPTY); + +// push all four items to the primary stream. this should produce two items. +pushToStream(4, "X"); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "X0+Y0", 0), +new KeyValueTimestamp<>(1, "X1+Y1", 1)); + +// push all items to the table. this should not produce any item +pushToTableNonRandom(4, "YY"); +processor.checkAndClearProcessResult(EMPTY); + +// push all four items to the primary stream. this should produce two items. +pushToStream(4, "X"); +processor.checkAndClearProcessResult( Review Comment: The reason this produces two output records is because the max timestamp seen so far is still 3, which means only records with timestamp 0 and 1 are emitted (timestamps 2 and 3 are still in the buffer). Can you add another step to this test which now produces a record with a larger timestamp and verifies that the records with timestamps 2 and 3 are emitted? There should be four of them, and they should be emitted in timestamp order which is different from the offset order that they arrived in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1244173231 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {{@link Record}}. + */ +public class RecordSerde implements PartitionWriter.Serializer, CoordinatorLoader.Deserializer { +@Override +public byte[] serializeKey(Record record) { +// Record does not accept a null key. +return MessageUtil.toVersionPrefixedBytes( +record.key().version(), +record.key().message() +); +} + +@Override +public byte[] serializeValue(Record record) { +// Tombstone is represented with a null value. +if (record.value() == null) { +return null; +} else { +return MessageUtil.toVersionPrefixedBytes( +record.value().version(), +record.value().message() +); +} +} + +@Override +public Record deserialize( +ByteBuffer keyBuffer, +ByteBuffer valueBuffer +) throws RuntimeException { +final short recordType = readVersion(keyBuffer, "key"); +final ApiMessage keyMessage = apiMessageKeyFor(recordType); +readMessage(keyMessage, keyBuffer, recordType, "key"); + +if (valueBuffer == null) { +return new Record(new ApiMessageAndVersion(keyMessage, recordType), null); +} + +final ApiMessage valueMessage = apiMessageValueFor(recordType); +final short valueVersion = readVersion(valueBuffer, "value"); Review Comment: I can see this being confusing, but I'm not sure if I have a recommendation to make it clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
divijvaidya commented on PR #13798: URL: https://github.com/apache/kafka/pull/13798#issuecomment-1609922486 > if this does affect performance too much Maybe we could use some type of recoding level for Yammer metrics too? We already have a configuration at: https://kafka.apache.org/documentation.html#brokerconfigs_metrics.recording.level. We can emit this metric at DEBUG level. Also, FYI, here [1] is the flamegraph where you can observe the impact of histogram.update() for requests. After opening this flamegraph in browser, you can look at call stack of processCompletedSends [1] https://github.com/divijvaidya/flamegraph-samples/blob/main/kafka/kafka-summit-2023/100mslinger-cpu-unclean.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
divijvaidya commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1244076297 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) + val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs") Review Comment: oh, I missed that. Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15128) snappy-java-1.1.8.4.jar library vulnerability
[ https://issues.apache.org/jira/browse/KAFKA-15128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737791#comment-17737791 ] Arushi Rai commented on KAFKA-15128: Hi [~ckamal] If possible, can you share the expected release version where this vulnerability will be resolved? > snappy-java-1.1.8.4.jar library vulnerability > - > > Key: KAFKA-15128 > URL: https://issues.apache.org/jira/browse/KAFKA-15128 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.4.0 >Reporter: priyatama >Priority: Major > Attachments: Screenshot 2023-06-27 at 12.30.51 PM.png > > > Hi Team, > we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we > need to get rid of it. > !Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230! > during analysis, we found snappy-java coming via kafka-clients. > As our application is not directly using snappy-java jar. > Can any one please explain what is use of snappy-java in kafka-client or can > we exclude that? > Latest kafka-client also having vulnerable snappy-jar, by when kafka-client > will release next version which is having non-vulnerable snappy-java jar in > it? > cc: [Mickael > Maison|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=mimaison] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1244065077 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java: ## @@ -26,7 +27,40 @@ * * @param The type of the record. */ -public interface CoordinatorLoader { +public interface CoordinatorLoader extends AutoCloseable { + +/** + * UnknownRecordTypeException is thrown when the Deserializer encounters + * an unknown record type. + */ +class UnknownRecordTypeException extends RuntimeException { +private final short unknownType; + +public UnknownRecordTypeException(short unknownType) { +super(String.format("Found an unknown record type %d", unknownType)); +this.unknownType = unknownType; +} + +public short unknownType() { +return unknownType; Review Comment: ah in the log message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1244064321 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java: ## @@ -26,7 +27,40 @@ * * @param The type of the record. */ -public interface CoordinatorLoader { +public interface CoordinatorLoader extends AutoCloseable { + +/** + * UnknownRecordTypeException is thrown when the Deserializer encounters + * an unknown record type. + */ +class UnknownRecordTypeException extends RuntimeException { Review Comment: Got it -- so it was handled as its own key and not catching a runtime exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1244063578 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {{@link Record}}. + */ +public class RecordSerde implements PartitionWriter.Serializer, CoordinatorLoader.Deserializer { +@Override +public byte[] serializeKey(Record record) { +// Record does not accept a null key. +return MessageUtil.toVersionPrefixedBytes( +record.key().version(), +record.key().message() +); +} + +@Override +public byte[] serializeValue(Record record) { +// Tombstone is represented with a null value. +if (record.value() == null) { +return null; +} else { +return MessageUtil.toVersionPrefixedBytes( +record.value().version(), +record.value().message() +); +} +} + +@Override +public Record deserialize( +ByteBuffer keyBuffer, +ByteBuffer valueBuffer +) throws RuntimeException { +final short recordType = readVersion(keyBuffer, "key"); +final ApiMessage keyMessage = apiMessageKeyFor(recordType); +readMessage(keyMessage, keyBuffer, recordType, "key"); + +if (valueBuffer == null) { +return new Record(new ApiMessageAndVersion(keyMessage, recordType), null); +} + +final ApiMessage valueMessage = apiMessageValueFor(recordType); +final short valueVersion = readVersion(valueBuffer, "value"); Review Comment: Ah I got confused by the `apiMessageValueFor` method. That one uses the key's version, but the value uses the record version. I see now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13921: MINOR: Reduce (hopefully) flakiness of testRackAwareRangeAssignor
dajac commented on PR #13921: URL: https://github.com/apache/kafka/pull/13921#issuecomment-1609889975 The test still fails. There is likely something else going on... I will keep investigating. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on PR #13798: URL: https://github.com/apache/kafka/pull/13798#issuecomment-1609874663 > Although one histogram calculation here should be ok, but it would be nice if you get some producer-perf.sh data in as well to ensure that this metric isn't adversely impacting latency. Thoughts? I can take a look at this. Is there an alternative metric that you suggest if this does affect performance too much? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1244044199 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -17,25 +17,37 @@ package kafka.server +import kafka.server.AddPartitionsToTxnManager.{verificationFailureRateMetricName, verificationTimeMsMetricName} import kafka.utils.Logging import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} import java.util +import java.util.concurrent.TimeUnit import scala.collection.mutable object AddPartitionsToTxnManager { type AppendCallback = Map[TopicPartition, Errors] => Unit + + val verificationFailureRateMetricName = "VerificationFailureRate" Review Comment: Error code makes sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1244035570 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) + val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs") Review Comment: Histogram is biased by default. This code uses a biased histogram, just as the other metrics do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1244034254 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) Review Comment: What version are we tagging if we don't end up sending the request though? I don't think it makes sense. We will also have the version on the api error metrics, right? I don't think version here is necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15086) The unreasonable segment size setting of the internal topics in MM2 may cause the worker startup time to be too long
[ https://issues.apache.org/jira/browse/KAFKA-15086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-15086: --- Labels: kip-943 (was: ) > The unreasonable segment size setting of the internal topics in MM2 may cause > the worker startup time to be too long > > > Key: KAFKA-15086 > URL: https://issues.apache.org/jira/browse/KAFKA-15086 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.4.1 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > Labels: kip-943 > Attachments: WechatIMG364.jpeg, WechatIMG365.jpeg, WechatIMG366.jpeg > > > As the config 'segment.bytes' for topics related MM2(such as > offset.storage.topic, config.storage.topic,status.storage.topic), if > following the default configuration of the broker or set it larger, then when > the MM cluster runs many and complicated tasks, especially the log volume of > the topic 'offset.storage.topic' is very large, it will affect the restart > speed of the MM workers. > After investigation, the reason is that a consumer needs to be started to > read the data of ‘offset.storage.topic’ at startup. Although this topic is > set to compact, if the 'segment size' is set to a large value, such as the > default value of 1G, then this topic may have tens of gigabytes of data that > cannot be compacted and has to be read from the earliest (because the active > segment cannot be cleaned), which will consume a lot of time (in our online > environment, we found that this topic stores 13G of data, it took nearly half > an hour for all the data to be consumed), which caused the worker to be > unable to start and execute tasks for a long time. > Of course, the number of consumer threads can also be adjusted, but I think > it may be easier to reduce the 'segment size', for example, refer to the > default value of __consumer_offsets: 100MB > > The first picture in the attachment is the log size stored in the internal > topic, the second one is the time when ‘offset.storage.topic’ starts to be > read, and the third one is the time when ‘offset.storage.topic’ being read > finished. It took about 23 minutes in total. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah merged pull request #13910: KAFKA-15109 Ensure the leader epoch bump occurs for older MetadataVersions
mumrah merged PR #13910: URL: https://github.com/apache/kafka/pull/13910 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737711#comment-17737711 ] Erik van Oosten commented on KAFKA-14972: - I will complete the KIP tomorrow. > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Erik van Oosten >Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > We propose to replace the thread-id check with an access-id that is stored on > a thread-local variable. Existing programs will not be affected. Developers > that work in an async runtime can pick up the access-id and set it on the > thread-local variable in a thread of their choosing. > Every time a callback is invoked a new access-id is generated. When the > callback completes, the previous access-id is restored. > This proposal does not make it impossible to use the client incorrectly. > However, we think it strikes a good balance between making correct usage from > an async runtime possible while making incorrect usage difficult. > Alternatives considered: > # Configuration that switches off the check completely. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-7143) Cannot use KafkaConsumer with Kotlin coroutines due to various issues
[ https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737709#comment-17737709 ] Kirk True commented on KAFKA-7143: -- cc [~pnee] [~lianetm] > Cannot use KafkaConsumer with Kotlin coroutines due to various issues > - > > Key: KAFKA-7143 > URL: https://issues.apache.org/jira/browse/KAFKA-7143 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 1.1.0 >Reporter: Raman Gupta >Priority: Major > > I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin > [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which > supports a style of async programming that avoids the need for callbacks (and > existing callback-based API's are usually easily be adapted to this style > with a simple wrapper). With coroutines, continuations are used instead: > methods with callbacks are suspended, and resumed once the call is complete. > With coroutines, while access to the KafkaConsumer is done in a thread-safe > way, it does NOT necessarily happen from a single thread -- a different > underlying thread may actually execute the code after the suspension point. > However, the KafkaConsumer includes additional checks to verify not only the > thread safety of the client, but that the *same thread* is being used -- if > the same thread (by id) is not being used the consumer throws an exception > like: > {code} > Exception in thread "ForkJoinPool.commonPool-worker-25" > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321) > {code} > I understand this check is present to protect people from themselves, but I'd > like the ability to disable this check so that this code can be used > effectively by libraries such as Kotlin coroutines. > There is a workaround for the above: run the consumer in a coroutine with a > single-thread context, which isn't ideal because it dedicates a thread to the > consumer. > However, further problems await -- the `commitAsync` method also cannot be > used with coroutines because the callback is never executed and therefore the > coroutine is never resumed past the suspension point. Upon investigation, it > seems the callback is only executed after future calls to poll, which in a > regular polling loop with coroutines will never happen because of the > suspension on `commitAsync`, so we have a deadlock. I guess the idea behind > this Kafka consumer API design is that consuming new messages may continue, > even though commits of previous offsets (which happened an arbitrarily long > amount of time in the past) have not necessarily been processed. However, > with a coroutine based API, the commitAsync can be sequential before the next > poll like commitSync, but happen asynchronously without tying up a client > application thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737710#comment-17737710 ] Kirk True commented on KAFKA-14972: --- cc [~pnee] [~lianetm] > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Erik van Oosten >Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > We propose to replace the thread-id check with an access-id that is stored on > a thread-local variable. Existing programs will not be affected. Developers > that work in an async runtime can pick up the access-id and set it on the > thread-local variable in a thread of their choosing. > Every time a callback is invoked a new access-id is generated. When the > callback completes, the previous access-id is restored. > This proposal does not make it impossible to use the client incorrectly. > However, we think it strikes a good balance between making correct usage from > an async runtime possible while making incorrect usage difficult. > Alternatives considered: > # Configuration that switches off the check completely. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14972: -- Labels: needs-kip (was: ) > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Erik van Oosten >Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > We propose to replace the thread-id check with an access-id that is stored on > a thread-local variable. Existing programs will not be affected. Developers > that work in an async runtime can pick up the access-id and set it on the > thread-local variable in a thread of their choosing. > Every time a callback is invoked a new access-id is generated. When the > callback completes, the previous access-id is restored. > This proposal does not make it impossible to use the client incorrectly. > However, we think it strikes a good balance between making correct usage from > an async runtime possible while making incorrect usage difficult. > Alternatives considered: > # Configuration that switches off the check completely. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio commented on a diff in pull request #13910: KAFKA-15109 Ensure the leader epoch bump occurs for older MetadataVersions
jsancio commented on code in PR #13910: URL: https://github.com/apache/kafka/pull/13910#discussion_r1243870229 ## metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java: ## @@ -207,7 +211,7 @@ public void testTriggerLeaderEpochBumpIfNeeded() { createFooBuilder() .setTargetIsrWithBrokerStates( AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))) -.setBumpLeaderEpochOnIsrShrink(true), +.enableBumpLeaderEpochOnIsrShrink(true), Review Comment: Got it. Outside the scope of this PR but it looks like we are missing some validation. This is adding 4 to the ISR but 4 is not a replica for this partition. cc @ahuang98 ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -300,9 +288,13 @@ private void tryElection(PartitionChangeRecord record) { */ void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) { if (record.leader() == NO_LEADER_CHANGE) { +boolean bumpLeaderEpochOnIsrShrink = metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled; + if (!Replicas.contains(targetReplicas, partition.replicas)) { +// Reassignment record.setLeader(partition.leader); -} else if (bumpLeaderEpochOnIsrShrink && !Replicas.contains(targetIsr, partition.isr)) { +} else if (!Replicas.contains(targetIsr, partition.isr) && bumpLeaderEpochOnIsrShrink) { Review Comment: We should check `bumpLeaderEpochOnIsrShrink` first as it is faster to compute. It avoids a search through the ISR arrays when it is false (most cases after MV 3.6). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13917: MINOR; Failed move should be logged at WARN
kirktrue commented on PR #13917: URL: https://github.com/apache/kafka/pull/13917#issuecomment-1609649429 @jsancio Another difference is that now the `outer` exception's stack trace will be shown via `WARN` instead of just the exception's message via `DEBUG`. I assume that's intentional, but just wanted to call it out. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] drawxy commented on a diff in pull request #13847: KAFKA-15082: The log retention policy doesn't take effect after altering log dir
drawxy commented on code in PR #13847: URL: https://github.com/apache/kafka/pull/13847#discussion_r1243863992 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1808,7 +1809,10 @@ class ReplicaManager(val config: KafkaConfig, // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) + replicaAlterLogDirsManager.getFetcher(topicPartition) match { Review Comment: Recently, I deployed this change to my cluster and found that this change didn't work in the scenario where an exception was raised. During altering, the partition being altered log dir would be marked as failed due to exceptions and the fetcher of it would be removed **without resuming the cleaning**. After receiving a LeaderAndISR request of that partition, the altering would be restarted by assign a fetcher to that partition, and the cleaning of that partition would be paused one more time due to no assigned fetcher. Therefore, the cleaning will never be resumed (pause twice, but resume once after altering completed). I pushed another commit. Every time assign fetcher to partition, pause the cleaning of that partition. And every time remove the fetcher, resume the cleaning of that partition. To make sure that resuming as many times as pausing. If this change make sense to you, I will add unit test later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
kirktrue commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1609641313 @flashmouse I'm a little slow on the uptake, so I'm trying to come up with a scenario. Let's say the following is true: * Topic `foo` has three partitions * The consumer group has two consumers, `consumer` and `otherConsumer` In the first case, let's say that `consumer` has one partition and `otherConsumer` has two. In that case it's as balanced as possible, right? But the code in `isBalanced` as written disagrees: ```java if (consumerPartitionCount < otherConsumerPartitionCount) { return false; } ``` Because `1` is less than `2` But it's going to run into problems because by returning `false` from `isBalanced`, I assume we then trigger a rebalance, which—at best— will cause the ownership of that partition to move from `otherConsumer` to `consumer`. When the next balance check for is run, we're in the same position 路♂️ But with your fix: ```java if (consumerPartitionCount + 1 < otherConsumerPartitionCount) { return false; } ``` It's effectively stating that the `consumer` has to have _at least_ two partitions difference, correct? Because `1 + 1` is _not less_ then `2`. Do I have the above (mostly) correct? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
[ https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737696#comment-17737696 ] Josep Prat commented on KAFKA-15105: Hi [~riedelmax], feel free to assign this issue to yourself :) {quote}I'm still trying to understand how the build infrastructure works. Can someone give me a hint, how to reproduce the behavior? {quote} In this rely part of the problem, many times these issues are not easily reproducible on your machine. Usually they occur on CI. The test itself is under the core module, so one way to try to reproduce it would be running `./gradlew core:test` or `./gradlew core:test --tests integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable`repeatedly until failure. IntelliJ (if you have it and use it) has a similar feature where one can run a test until failure. Is this answering your question? Or you wanted to know some other specific details? > Flaky test > FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable > - > > Key: KAFKA-15105 > URL: https://issues.apache.org/jira/browse/KAFKA-15105 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Josep Prat >Priority: Major > Labels: flaky-test > > Test > integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable() > became flaky. An example can be found here: > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable__/] > The error might be caused because of a previous kafka cluster used for > another test wasn't cleaned up properly before this one run. > > h3. Error Message > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists.{code} > h3. Stacktrace > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
kirktrue commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1609604468 @ableegoldman you're pretty familiar with this code, IIUC. If so, could you take a look at this PR? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13921: MINOR: Reduce (hopefully) flakiness of testRackAwareRangeAssignor
kirktrue commented on code in PR #13921: URL: https://github.com/apache/kafka/pull/13921#discussion_r1243822717 ## core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala: ## @@ -249,6 +249,9 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { reassignments.put(new TopicPartition(topicWithSingleRackPartitions, p), util.Optional.of(newAssignment)) } admin.alterPartitionReassignments(reassignments).all().get(30, TimeUnit.SECONDS) + TestUtils.waitUntilTrue( +() => admin.listPartitionReassignments().reassignments().get().isEmpty, Review Comment: Do we want to pass in a timeout to `get` in case it hangs? ```suggestion () => admin.listPartitionReassignments().reassignments().get(30, TimeUnit.SECONDS).isEmpty, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1243806647 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1072,1338 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value, +short version +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should not be added. +// TODO: this needs to be checked in conjunction with empty group offsets. +//if (groups.containsKey(groupId)) { +//throw new IllegalStateException("Unexpected unload of active group " + groupId + +//"while loading partition " + topicPartition); +//} +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = version == 0 ? member.sessionTimeout() : member.rebalanceTimeout(); + +JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestProtocolCollection(); +supportedProtocols.add(new JoinGroupRequestProtocol() +.setName(value.protocol()) +.setMetadata(member.subscription())); + +GenericGroupMember loadedMember = new GenericGroupMember( +member.memberId(), +Optional.ofNullable(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +rebalanceTimeout, +member.sessionTimeout(), +value.protocolType(), +supportedProtocols, +member.assignment() +); + +loadedMembers.add(loadedMember); +} + +String protocolType = value.protocolType(); + +GenericGroup genericGroup = new GenericGroup( +this.logContext, +groupId, +loadedMembers.isEmpty() ? EMPTY : STABLE, +time, +value.generation(), +protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), +Optional.ofNullable(value.protocol()), +Optional.ofNullable(value.leader()), +value.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(value.currentStateTimestamp()) +); + +loadedMembers.forEach(member -> { +genericGroup.add(member, null); +log.info("Loaded member {} in group {} with generation {}.", +member.memberId(), groupId, genericGroup.generationId()); +}); + +genericGroup.setSubscribedTopics( +genericGroup.computeSubscribedTopics() +); +} +} + +/** + * Handle a JoinGroupRequest. + * + * @param context The request context. + * @param request The actual JoinGroup request. + * + * @return The result that contains records to append if the join group phase completes. + */ +public CoordinatorResult, Record> genericGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) { +CoordinatorResult, Record> result = EMPTY_RESULT; +String groupId = request.groupId(); +String memberId = request.memberId(); +int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs || +sessionTimeoutMs > genericGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +} else { +boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +// Group is created if it does not exist and the member id is UNKNOWN. if member +// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND +GenericGroup group; +try { +group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember); +} catch (Throwable t) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.forException(t).code()) +); +return
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1243795683 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -609,7 +673,7 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon } public synchronized void transitionToUninitialized(RuntimeException exception) { -transitionTo(State.UNINITIALIZED); +transitionTo(State.UNINITIALIZED, exception); Review Comment: Good catch. That was me being sloppy/assumptive. The `transitionTo` doesn't look at the incoming exception unless the target state is `FATAL_ERROR` or `ABORTABLE_ERROR`, so this was effectively a no-op anyway 路♂️. I've removed the exception from the call to `transitionTo`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14133: -- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owners: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) #
[GitHub] [kafka] clolov closed pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests
clolov closed pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests URL: https://github.com/apache/kafka/pull/12607 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah closed pull request #12883: Kip 866 part 1
mumrah closed pull request #12883: Kip 866 part 1 URL: https://github.com/apache/kafka/pull/12883 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on PR #13880: URL: https://github.com/apache/kafka/pull/13880#issuecomment-1609431175 @jolshan @jeffkbkim Thanks for your review. I have addressed your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13910: KAFKA-15109 Ensure the leader epoch bump occurs for older MetadataVersions
mumrah commented on code in PR #13910: URL: https://github.com/apache/kafka/pull/13910#discussion_r1243667664 ## metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java: ## @@ -207,7 +211,7 @@ public void testTriggerLeaderEpochBumpIfNeeded() { createFooBuilder() .setTargetIsrWithBrokerStates( AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))) -.setBumpLeaderEpochOnIsrShrink(true), +.enableBumpLeaderEpochOnIsrShrink(true), Review Comment: This case is an ISR expansion, so we don't expect an epoch bump (I added a comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243668510 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RecordSerdeTest { +@Test +public void testSerializeKey() { +RecordSerde serializer = new RecordSerde(); +Record record = new Record( +new ApiMessageAndVersion( +new ConsumerGroupMetadataKey().setGroupId("group"), +(short) 1 +), +new ApiMessageAndVersion( +new ConsumerGroupMetadataValue().setEpoch(10), +(short) 0 +) +); + +assertArrayEquals( +MessageUtil.toVersionPrefixedBytes(record.key().version(), record.key().message()), +serializer.serializeKey(record) +); +} + +@Test +public void testSerializeValue() { +RecordSerde serializer = new RecordSerde(); +Record record = new Record( +new ApiMessageAndVersion( +new ConsumerGroupMetadataKey().setGroupId("group"), +(short) 1 +), +new ApiMessageAndVersion( +new ConsumerGroupMetadataValue().setEpoch(10), +(short) 0 +) +); + +assertArrayEquals( +MessageUtil.toVersionPrefixedBytes(record.value().version(), record.value().message()), +serializer.serializeValue(record) +); +} + +@Test +public void testSerializeNullValue() { +RecordSerde serializer = new RecordSerde(); +Record record = new Record( +new ApiMessageAndVersion( +new ConsumerGroupMetadataKey().setGroupId("group"), +(short) 1 +), +null +); + +assertNull(serializer.serializeValue(record)); +} + +@Test +public void testDeserialize() { +RecordSerde serDe = new RecordSerde(); + +ApiMessageAndVersion key = new ApiMessageAndVersion( +new
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243666829 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RecordSerdeTest { +@Test +public void testSerializeKey() { +RecordSerde serializer = new RecordSerde(); +Record record = new Record( +new ApiMessageAndVersion( +new ConsumerGroupMetadataKey().setGroupId("group"), +(short) 1 +), +new ApiMessageAndVersion( +new ConsumerGroupMetadataValue().setEpoch(10), +(short) 0 +) +); + +assertArrayEquals( +MessageUtil.toVersionPrefixedBytes(record.key().version(), record.key().message()), +serializer.serializeKey(record) +); +} + +@Test +public void testSerializeValue() { Review Comment: see my previous answer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243666154 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RecordSerdeTest { +@Test +public void testSerializeKey() { +RecordSerde serializer = new RecordSerde(); +Record record = new Record( +new ApiMessageAndVersion( +new ConsumerGroupMetadataKey().setGroupId("group"), +(short) 1 Review Comment: The version of the key does not really matter here but let's use 3. Regarding your suggestion to add a test with a different value, I don't see the need because the value is ignored anyway here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243663487 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java: ## @@ -26,7 +27,40 @@ * * @param The type of the record. */ -public interface CoordinatorLoader { +public interface CoordinatorLoader extends AutoCloseable { + +/** + * UnknownRecordTypeException is thrown when the Deserializer encounters + * an unknown record type. + */ +class UnknownRecordTypeException extends RuntimeException { +private final short unknownType; + +public UnknownRecordTypeException(short unknownType) { +super(String.format("Found an unknown record type %d", unknownType)); +this.unknownType = unknownType; +} + +public short unknownType() { +return unknownType; Review Comment: It is also used in CoordinatorLoadedImpl. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243662918 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java: ## @@ -26,7 +27,40 @@ * * @param The type of the record. */ -public interface CoordinatorLoader { +public interface CoordinatorLoader extends AutoCloseable { + +/** + * UnknownRecordTypeException is thrown when the Deserializer encounters + * an unknown record type. + */ +class UnknownRecordTypeException extends RuntimeException { Review Comment: We had something similar here: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1376. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243661641 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {{@link Record}}. + */ +public class RecordSerde implements PartitionWriter.Serializer, CoordinatorLoader.Deserializer { +@Override +public byte[] serializeKey(Record record) { +// Record does not accept a null key. +return MessageUtil.toVersionPrefixedBytes( +record.key().version(), +record.key().message() +); +} + +@Override +public byte[] serializeValue(Record record) { +// Tombstone is represented with a null value. +if (record.value() == null) { +return null; +} else { +return MessageUtil.toVersionPrefixedBytes( +record.value().version(), +record.value().message() +); +} +} + +@Override +public Record deserialize( +ByteBuffer keyBuffer, +ByteBuffer valueBuffer +) throws RuntimeException { +final short recordType = readVersion(keyBuffer, "key"); +final ApiMessage keyMessage = apiMessageKeyFor(recordType); +readMessage(keyMessage, keyBuffer, recordType, "key"); + +if (valueBuffer == null) { +return new Record(new ApiMessageAndVersion(keyMessage, recordType), null); +} + +final ApiMessage valueMessage = apiMessageValueFor(recordType); +final short valueVersion = readVersion(valueBuffer, "value"); Review Comment: Right. The record type is taken from the key whereas the version in the value is really the version of the message serialized in the value. This is weird, I agree... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243660584 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {{@link Record}}. + */ +public class RecordSerde implements PartitionWriter.Serializer, CoordinatorLoader.Deserializer { +@Override +public byte[] serializeKey(Record record) { Review Comment: Yeah, I was thinking about this as well but decided against it in the end. I still hope that we could introduce an `apiKey` field in the schemas like we did for the "metadata" records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243657611 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {{@link Record}}. + */ +public class RecordSerde implements PartitionWriter.Serializer, CoordinatorLoader.Deserializer { Review Comment: This does not seem necessary. The package name seems to be enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243657016 ## core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala: ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.log.UnifiedLog +import kafka.server.ReplicaManager +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecords, SimpleRecord} +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException +import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback} +import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogOffsetMetadata} +import org.apache.kafka.test.TestUtils.assertFutureThrows +import org.junit.jupiter.api.Assertions.{assertEquals, assertNull} +import org.junit.jupiter.api.{Test, Timeout} +import org.mockito.{ArgumentCaptor, ArgumentMatchers} +import org.mockito.Mockito.{mock, verify, when} + +import java.nio.ByteBuffer +import java.nio.charset.Charset +import java.util.concurrent.{CountDownLatch, TimeUnit} + +class StringKeyValueDeserializer extends CoordinatorLoader.Deserializer[(String, String)] { + override def deserialize(key: ByteBuffer, value: ByteBuffer): (String, String) = { +( + Charset.defaultCharset().decode(key).toString, + Charset.defaultCharset().decode(value).toString +) + } +} + +@Timeout(60) +class CoordinatorLoaderImplTest { + @Test + def testNonexistentPartition(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val serde = mock(classOf[CoordinatorLoader.Deserializer[(String, String)]]) +val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + +TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 +)) { loader => + when(replicaManager.getLog(tp)).thenReturn(None) + + val result = loader.load(tp, coordinator) + assertFutureThrows(result, classOf[NotLeaderOrFollowerException]) +} + } + + @Test + def testLoadingIsRejectedWhenClosed(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val serde = mock(classOf[CoordinatorLoader.Deserializer[(String, String)]]) +val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + +TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 +)) { loader => + loader.close() + + val result = loader.load(tp, coordinator) + assertFutureThrows(result, classOf[RuntimeException]) +} + } + + @Test + def testLoading(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val serde = new StringKeyValueDeserializer +val log = mock(classOf[UnifiedLog]) +val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + +TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 +)) { loader => + when(replicaManager.getLog(tp)).thenReturn(Some(log)) + when(log.logStartOffset).thenReturn(0L) + when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L)) + + val readResult1 = logReadResult(startOffset = 0, records = Seq( +new SimpleRecord("k1".getBytes, "v1".getBytes), +new SimpleRecord("k2".getBytes, "v2".getBytes) + )) + + when(log.read( +startOffset = 0L, +maxLength = 1000, +isolation = FetchIsolation.LOG_END, +minOneMessage = true + )).thenReturn(readResult1) + + val readResult2 = logReadResult(startOffset = 2, records = Seq( +new SimpleRecord("k3".getBytes, "v3".getBytes), +new
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243649444 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.server.ReplicaManager +import kafka.utils.Logging +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.record.{FileRecords, MemoryRecords} +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, UnknownRecordTypeException} +import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback} +import org.apache.kafka.server.util.KafkaScheduler +import org.apache.kafka.storage.internals.log.FetchIsolation + +import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean +import scala.jdk.CollectionConverters._ + +/** + * Coordinator loader which reads records from a partition and replays them + * to a group coordinator. + * + * @param replicaManager The replica manager. + * @param deserializerThe deserializer to use. + * @param loadBufferSize The load buffer size. + * @tparam T The record type. + */ +class CoordinatorLoaderImpl[T]( + replicaManager: ReplicaManager, + deserializer: Deserializer[T], + loadBufferSize: Int +) extends CoordinatorLoader[T] with Logging { + private val isRunning = new AtomicBoolean(true) + private val scheduler = new KafkaScheduler(1) + scheduler.startup() + + /** + * Loads the coordinator by reading all the records from the TopicPartition + * and applying them to the Replayable object. + * + * @param tp The TopicPartition to read from. + * @param coordinator The object to apply records to. + */ + override def load( +tp: TopicPartition, +coordinator: CoordinatorPlayback[T] +): CompletableFuture[Void] = { +val future = new CompletableFuture[Void]() +val result = scheduler.scheduleOnce(s"Load coordinator from $tp", + () => doLoad(tp, coordinator, future)) +if (result.isCancelled) { + future.completeExceptionally(new RuntimeException("Coordinator loader is closed.")) +} +future + } + + private def doLoad( +tp: TopicPartition, +coordinator: CoordinatorPlayback[T], +future: CompletableFuture[Void] + ): Unit = { +try { + replicaManager.getLog(tp) match { +case None => + future.completeExceptionally(new NotLeaderOrFollowerException( +s"Could not load records from $tp because the log does not exist.")) + +case Some(log) => + def logEndOffset: Long = replicaManager.getLogEndOffset(tp).getOrElse(-1L) + + // buffer may not be needed if records are read from memory + var buffer = ByteBuffer.allocate(0) + // loop breaks if leader changes at any time during the load, since logEndOffset is -1 + var currOffset = log.logStartOffset + // loop breaks if no records have been read, since the end of the log has been reached + var readAtLeastOneRecord = true + + while (currOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) { +val fetchDataInfo = log.read( + startOffset = currOffset, + maxLength = loadBufferSize, + isolation = FetchIsolation.LOG_END, + minOneMessage = true +) + +readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 + +val memoryRecords = (fetchDataInfo.records: @unchecked) match { + case records: MemoryRecords => +records + + case fileRecords: FileRecords => +val sizeInBytes = fileRecords.sizeInBytes +val bytesNeeded = Math.max(loadBufferSize, sizeInBytes) + +// minOneMessage = true in the above log.read means that the buffer may need to +// be grown to ensure progress can be made. +if (buffer.capacity < bytesNeeded) { + if (loadBufferSize <
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243647581 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.server.ReplicaManager +import kafka.utils.Logging +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.record.{FileRecords, MemoryRecords} +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, UnknownRecordTypeException} +import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback} +import org.apache.kafka.server.util.KafkaScheduler +import org.apache.kafka.storage.internals.log.FetchIsolation + +import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean +import scala.jdk.CollectionConverters._ + +/** + * Coordinator loader which reads records from a partition and replays them + * to a group coordinator. + * + * @param replicaManager The replica manager. + * @param deserializerThe deserializer to use. + * @param loadBufferSize The load buffer size. + * @tparam T The record type. + */ +class CoordinatorLoaderImpl[T]( + replicaManager: ReplicaManager, + deserializer: Deserializer[T], + loadBufferSize: Int +) extends CoordinatorLoader[T] with Logging { + private val isRunning = new AtomicBoolean(true) + private val scheduler = new KafkaScheduler(1) + scheduler.startup() + + /** + * Loads the coordinator by reading all the records from the TopicPartition + * and applying them to the Replayable object. + * + * @param tp The TopicPartition to read from. + * @param coordinator The object to apply records to. + */ + override def load( +tp: TopicPartition, +coordinator: CoordinatorPlayback[T] +): CompletableFuture[Void] = { +val future = new CompletableFuture[Void]() +val result = scheduler.scheduleOnce(s"Load coordinator from $tp", + () => doLoad(tp, coordinator, future)) +if (result.isCancelled) { Review Comment: Correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243647232 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.server.ReplicaManager +import kafka.utils.Logging +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.record.{FileRecords, MemoryRecords} +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, UnknownRecordTypeException} +import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback} +import org.apache.kafka.server.util.KafkaScheduler +import org.apache.kafka.storage.internals.log.FetchIsolation + +import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean +import scala.jdk.CollectionConverters._ + +/** + * Coordinator loader which reads records from a partition and replays them + * to a group coordinator. + * + * @param replicaManager The replica manager. + * @param deserializerThe deserializer to use. + * @param loadBufferSize The load buffer size. + * @tparam T The record type. + */ +class CoordinatorLoaderImpl[T]( Review Comment: Indeed, it will be created in BrokerServer when the coordinator is created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)
cadonna commented on code in PR #13855: URL: https://github.com/apache/kafka/pull/13855#discussion_r1243468362 ## streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java: ## @@ -189,7 +221,22 @@ public Joined withOtherValueSerde(final Serde otherValueSerde) { */ @Override public Joined withName(final String name) { -return new Joined<>(keySerde, valueSerde, otherValueSerde, name); +return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod); +} + +/** + * Set the grace period on the stream side of the join. Records will enter a buffer before being processed. Out of order records in the grace period will be processed in timestamp order. Late records, out of the grace period, will be executed right as they come in, if it is past the table history retention this could result in joins on the wrong version or a null join. Long gaps in stream side arriving records will cause records to be delayed in processing, even resulting in be processed out of the grace period window. Review Comment: Could you please also add a couple of line breaks to the java docs as you did for the javadocs of the `with()` method? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java: ## @@ -56,10 +77,59 @@ public void init(final ProcessorContext context) { final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); valueGetter.init(context); +internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context); +if (useBuffer) { +if (!valueGetter.isVersioned() && gracePeriod.isPresent()) { +throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); +} + +buffer.get().setSerdesIfNull(new SerdeGetter(context)); + buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null); +} } @Override public void process(final Record record) { +internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context()); +updateObservedStreamTime(record.timestamp()); +if (maybeDropRecord(record)) { +return; +} + +if (!useBuffer) { +doJoin(record); +} else { +if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) { +doJoin(record); +} +buffer.get().evictWhile(() -> true, this::emit); Review Comment: I think that should be the following way to avoid a unnecessary range query: ```suggestion if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) { doJoin(record); } else { buffer.get().evictWhile(() -> true, this::emit); } ``` If the record is a late record it will not update the observed stream time. If the observed stream time is not updated, the range query will not return records that need to be evicted, since they have been already evicted the last time `evictWhile()` was called. Does this make sense? If you agree could also please add a test for this? ## streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java: ## @@ -203,4 +250,4 @@ public Serde valueSerde() { public Serde otherValueSerde() { return otherValueSerde; } -} +} Review Comment: nit: Could remove this change? ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ## @@ -112,6 +125,74 @@ private void pushNullValueToTable() { } } + +private void makeJoin(final Duration grace) { +final KStream stream; +final KTable table; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +builder = new StreamsBuilder(); + +final Consumed consumed = Consumed.with(Serdes.Integer(), Serdes.String()); +stream = builder.stream(streamTopic, consumed); +table = builder.table("tableTopic2", consumed, Materialized.as( +Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5; +stream.join(table, +MockValueJoiner.TOSTRING_JOINER, +Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace) +).process(supplier); +final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); +driver = new TopologyTestDriver(builder.build(), props); +inputStreamTopic
[GitHub] [kafka] divijvaidya commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
divijvaidya commented on PR #13798: URL: https://github.com/apache/kafka/pull/13798#issuecomment-1609327501 Another point I want to call out is that Yammer metrics histogram is notorious for consuming CPU (and increase latency). It consumes ~4-5% CPU on the network threads for calculating per request latency. Given that we are doing this on the latency critical append path (right?), I suspect we may encounter some latency increase + CPU increase. Although one histogram calculation here should be ok, but it would be nice if you get some producer-perf.sh data in as well to ensure that this metric isn't adversely impacting latency. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
divijvaidya commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1243570850 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -17,25 +17,37 @@ package kafka.server +import kafka.server.AddPartitionsToTxnManager.{verificationFailureRateMetricName, verificationTimeMsMetricName} import kafka.utils.Logging import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} import java.util +import java.util.concurrent.TimeUnit import scala.collection.mutable object AddPartitionsToTxnManager { type AppendCallback = Map[TopicPartition, Errors] => Unit + + val verificationFailureRateMetricName = "VerificationFailureRate" Review Comment: Do we want to add a tag for error code as well? That will help us get per error code failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
divijvaidya commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1243569174 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) Review Comment: When we bump the version beyond 4 (in future), let's say we have 5 & 6, wouldn't we want to ability to look at failure metrics for version 5 only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect
yashmayya commented on code in PR #13915: URL: https://github.com/apache/kafka/pull/13915#discussion_r1243145305 ## docs/connect.html: ## @@ -313,7 +313,13 @@ REST API DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector -GET /connectors/{name}/offsets - get the current offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875 for more details) +Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875 for more details): + +GET /connectors/{name}/offsets - get the current offsets for a connector +DELETE /connectors/{name}/offsets - reset the offsets for a connector. The connector must exist and must be in the stopped state. +PATCH /connectors/{name}/offsets - alter the offsets for a connector. The connector must exist and must be in the stopped state. The request body should be a JSON object containing a JSON array offsets field, similar to the response body of the GET /connectors/{name}/offsets REST API. Review Comment: There is a link to the generated OpenAPI docs at the end of this section so I don't think it's necessary to add the same link here as well? Also, I think an actual example might be more helpful than the generated OpenAPI spec where the finest level of granularity is the [ConnectorOffset](https://github.com/apache/kafka/blob/c5889fceddb9a0174452ae60a57c8ff3f087a6a4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffset.java#L41) schema describing the `partition` and `offset` keys having JSON object values. I was hoping that the link to the KIP should be sufficient, but I do see the value of including actual examples directly in the docs as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15128) snappy-java-1.1.8.4.jar library vulnerability
[ https://issues.apache.org/jira/browse/KAFKA-15128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737576#comment-17737576 ] Kamal Chandraprakash commented on KAFKA-15128: -- Snappy jar will be used to compress the records from producer. If you're not setting the {{compression.type}} as {{snappy}} in your producer configuration, then you can safely exclude this jar from your distribution. https://github.com/apache/kafka/blob/trunk/build.gradle#L1342 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java#L89 > snappy-java-1.1.8.4.jar library vulnerability > - > > Key: KAFKA-15128 > URL: https://issues.apache.org/jira/browse/KAFKA-15128 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.4.0 >Reporter: priyatama >Priority: Major > Attachments: Screenshot 2023-06-27 at 12.30.51 PM.png > > > Hi Team, > we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we > need to get rid of it. > !Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230! > during analysis, we found snappy-java coming via kafka-clients. > As our application is not directly using snappy-java jar. > Can any one please explain what is use of snappy-java in kafka-client or can > we exclude that? > Latest kafka-client also having vulnerable snappy-jar, by when kafka-client > will release next version which is having non-vulnerable snappy-java jar in > it? > cc: [Mickael > Maison|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=mimaison] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
divijvaidya commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1243425245 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -90,30 +109,34 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time topicPartitionsToError.put(new TopicPartition(topic.name, partition), error) } } +verificationFailureRate.mark(topicPartitionsToError.size) topicPartitionsToError.toMap } + private def sendCallback(callback: AddPartitionsToTxnManager.AppendCallback, errorMap: Map[TopicPartition, Errors], startTime: Long): Unit = { Review Comment: s/startTime/startTimeMs so that it is clear what the units are ## core/src/main/scala/kafka/network/RequestChannel.scala: ## @@ -240,17 +240,18 @@ object RequestChannel extends Logging { val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos) val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos) - val fetchMetricNames = + val overrideMetricNames = if (header.apiKey == ApiKeys.FETCH) { - val isFromFollower = body[FetchRequest].isFromFollower - Seq( -if (isFromFollower) RequestMetrics.followFetchMetricName + val specifiedMetricName = +if (body[FetchRequest].isFromFollower) RequestMetrics.followFetchMetricName else RequestMetrics.consumerFetchMetricName - ) + Seq(specifiedMetricName, header.apiKey.name) +} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && body[AddPartitionsToTxnRequest].allVerifyOnlyRequest()) { Review Comment: nit allVerifyOnlyRequest() parenthesis is optional in scala ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) + val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs") Review Comment: Thank you for the explanation. Makes sense. > 1\ use a biased histogram Could also also please respond to comment around usage of a biased histogram ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -17,25 +17,37 @@ package kafka.server +import kafka.server.AddPartitionsToTxnManager.{verificationFailureRateMetricName, verificationTimeMsMetricName} import kafka.utils.Logging import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} import java.util +import java.util.concurrent.TimeUnit import scala.collection.mutable object AddPartitionsToTxnManager { type AppendCallback = Map[TopicPartition, Errors] => Unit + + val verificationFailureRateMetricName = "VerificationFailureRate" Review Comment: nit constant variables start from capital letter (pascal case) in Kafka code base from what I have observed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] flashmouse commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1609178487 @divijvaidya thank you for reply! the unit test ``org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest#testLargeAssignmentAndGroupWithNonEqualSubscription`` could use much more time than configured Timeout(90) and this could reproduce easy when ``partitionCount`` and ``consumerCount`` are large enough and not equal(ex. ``partitionCount``= 200, ``consumerCount``= 20), in this situation ``isBalanced`` and ``performReassignments`` may run whole loop body. because both ``performReassignments`` and ``isBalanced`` in ``AbstractStickyAssignor`` are not so efficient(``performReassignments`` is worse), ``isBalanced`` could abate ``performReassignments`` call times, so fix its logic could speed up this unit test when parameters are small enough. in my test, after this fix, ``testLargeAssignmentAndGroupWithNonEqualSubscription`` could pass when ``partitionCount``= 200, ``consumerCount`` = 20, but is still slower than the combination ``partitionCount``= 2000, ``consumerCount`` = 2000 because in this test when the 2 parameters are equal, ``isBalanced`` satisfy the prediction ``min >= max - 1`` thus could return true immediately. may be I need add a unit test code with a new commit? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a diff in pull request #13862: KAFKA-15050: format the prompts in the quickstart
tombentley commented on code in PR #13862: URL: https://github.com/apache/kafka/pull/13862#discussion_r1243442987 ## docs/quickstart.html: ## @@ -154,9 +154,9 @@ By default, each line you enter will result in a separate event being written to the topic. -$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 -This is my first event -This is my second event +$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 +$ This is my first event +$ This is my second event Review Comment: `kafka-console-producer.sh` has its own prompt using a `>` character and without a space. ```suggestion >This is my first event >This is my second event ``` ## docs/quickstart.html: ## @@ -32,7 +32,7 @@ the latest Kafka release and extract it: -$ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz +$ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz Review Comment: The `` already has `class="language-bash"`, so I don't think we need it on the `` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15128) snappy-java-1.1.8.4.jar library vulnerability
[ https://issues.apache.org/jira/browse/KAFKA-15128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] priyatama updated KAFKA-15128: -- Description: Hi Team, we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we need to get rid of it. !Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230! during analysis, we found snappy-java coming via kafka-clients. As our application is not directly using snappy-java jar. Can any one please explain what is use of snappy-java in kafka-client or can we exclude that? Latest kafka-client also having vulnerable snappy-jar, by when kafka-client will release next version which is having non-vulnerable snappy-java jar in it? cc: [Mickael Maison|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=mimaison] was: Hi Team, we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we need to get rid of it. !Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230! during analysis, we found snappy-java coming via kafka-clients. As our application is not directly using snappy-java jar. Can any one please explain what is use of snappy-java in kafka-client or can we exclude that? Latest kafka-client also having vulnerable snappy-jar, by when kafka-client will release next version which is having non-vulnerable snappy-java jar in it? > snappy-java-1.1.8.4.jar library vulnerability > - > > Key: KAFKA-15128 > URL: https://issues.apache.org/jira/browse/KAFKA-15128 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.4.0 >Reporter: priyatama >Priority: Major > Attachments: Screenshot 2023-06-27 at 12.30.51 PM.png > > > Hi Team, > we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we > need to get rid of it. > !Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230! > during analysis, we found snappy-java coming via kafka-clients. > As our application is not directly using snappy-java jar. > Can any one please explain what is use of snappy-java in kafka-client or can > we exclude that? > Latest kafka-client also having vulnerable snappy-jar, by when kafka-client > will release next version which is having non-vulnerable snappy-java jar in > it? > cc: [Mickael > Maison|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=mimaison] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15128) snappy-java-1.1.8.4.jar library vulnerability
[ https://issues.apache.org/jira/browse/KAFKA-15128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] priyatama updated KAFKA-15128: -- Priority: Major (was: Minor) > snappy-java-1.1.8.4.jar library vulnerability > - > > Key: KAFKA-15128 > URL: https://issues.apache.org/jira/browse/KAFKA-15128 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.4.0 >Reporter: priyatama >Priority: Major > Attachments: Screenshot 2023-06-27 at 12.30.51 PM.png > > > Hi Team, > we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we > need to get rid of it. > !Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230! > during analysis, we found snappy-java coming via kafka-clients. > As our application is not directly using snappy-java jar. > Can any one please explain what is use of snappy-java in kafka-client or can > we exclude that? > Latest kafka-client also having vulnerable snappy-jar, by when kafka-client > will release next version which is having non-vulnerable snappy-java jar in > it? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
divijvaidya commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1609110377 Thank you for your first change to Apache Kafka @flashmouse! In the JIRA, you mention that this could be reproduced using a unit test. Can you please add the unit test here which would fail prior to this change and succeed after it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15128) snappy-java-1.1.8.4.jar library vulnerability
priyatama created KAFKA-15128: - Summary: snappy-java-1.1.8.4.jar library vulnerability Key: KAFKA-15128 URL: https://issues.apache.org/jira/browse/KAFKA-15128 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.4.0 Reporter: priyatama Attachments: Screenshot 2023-06-27 at 12.30.51 PM.png Hi Team, we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we need to get rid of it. !Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230! during analysis, we found snappy-java coming via kafka-clients. As our application is not directly using snappy-java jar. Can any one please explain what is use of snappy-java in kafka-client or can we exclude that? Latest kafka-client also having vulnerable snappy-jar, by when kafka-client will release next version which is having non-vulnerable snappy-java jar in it? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.
[ https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-15127: - Assignee: Sagar Rao > Allow offsets to be reset at the same time a connector is deleted. > -- > > Key: KAFKA-15127 > URL: https://issues.apache.org/jira/browse/KAFKA-15127 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > This has been listed as [Future > Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] > in KIP-875. Now that the delete offsets mechanism is also in place, we can > take this up which will allow connector names to be reused after connector > deletion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15106) AbstractStickyAssignor may stuck in 3.5
[ https://issues.apache.org/jira/browse/KAFKA-15106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] li xiangyuan reassigned KAFKA-15106: Assignee: li xiangyuan > AbstractStickyAssignor may stuck in 3.5 > --- > > Key: KAFKA-15106 > URL: https://issues.apache.org/jira/browse/KAFKA-15106 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.0 >Reporter: li xiangyuan >Assignee: li xiangyuan >Priority: Major > > this could reproduce in ut easy, > just int > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest#testLargeAssignmentAndGroupWithNonEqualSubscription, > plz set > partitionCount=200, > consumerCount=20, you can see > isBalanced will return false forever. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.
Sagar Rao created KAFKA-15127: - Summary: Allow offsets to be reset at the same time a connector is deleted. Key: KAFKA-15127 URL: https://issues.apache.org/jira/browse/KAFKA-15127 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Sagar Rao This has been listed as [Future Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] in KIP-875. Now that the delete offsets mechanism is also in place, we can take this up which will allow connector names to be reused after connector deletion. -- This message was sent by Atlassian Jira (v8.20.10#820010)