Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling fetches from remote storage [kafka]
abhijeetk88 commented on code in PR #16071: URL: https://github.com/apache/kafka/pull/16071#discussion_r1621675519 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -6597,6 +6597,79 @@ class ReplicaManagerTest { )) } } + + @Test + def testRemoteReadQuotaExceeded(): Unit = { +when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true) + +val tp0 = new TopicPartition(topic, 0) +val tpId0 = new TopicIdPartition(topicId, tp0) +val fetch: Seq[(TopicIdPartition, LogReadResult)] = readFromLogWithOffsetOutOfRange(tp0) + +assertEquals(1, fetch.size) +assertEquals(tpId0, fetch.head._1) +val fetchInfo = fetch.head._2.info +assertEquals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, fetchInfo.fetchOffsetMetadata) +assertFalse(fetchInfo.records.records().iterator().hasNext) +assertFalse(fetchInfo.firstEntryIncomplete) +assertFalse(fetchInfo.abortedTransactions.isPresent) +assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent) + } + + @Test + def testRemoteReadQuotaNotExceeded(): Unit = { +when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false) + +val tp0 = new TopicPartition(topic, 0) +val tpId0 = new TopicIdPartition(topicId, tp0) +val fetch: Seq[(TopicIdPartition, LogReadResult)] = readFromLogWithOffsetOutOfRange(tp0) + +assertEquals(1, fetch.size) +assertEquals(tpId0, fetch.head._1) +val fetchInfo = fetch.head._2.info +assertEquals(1L, fetchInfo.fetchOffsetMetadata.messageOffset) +assertEquals(UnifiedLog.UnknownOffset, fetchInfo.fetchOffsetMetadata.segmentBaseOffset) +assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment) +assertEquals(MemoryRecords.EMPTY, fetchInfo.records) +assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent) + } + + private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = { +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true) +try { + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava + val topicIds = Map(tp.topic -> topicId).asJava + val leaderEpoch = 0 + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, +Seq( + new LeaderAndIsrPartitionState() +.setTopicName(tp.topic) +.setPartitionIndex(tp.partition) +.setControllerEpoch(0) +.setLeader(leaderEpoch) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2141192632 Thanks @chia7712 @jolshan . Apologies for the miss. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] ignore [kafka]
github-actions[bot] commented on PR #15355: URL: https://github.com/apache/kafka/pull/15355#issuecomment-2141172395 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
gongxuanzhang commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621586240 ## checkstyle/suppressions.xml: ## @@ -361,4 +361,7 @@ + + Review Comment: I think checkstyle should consistent with auto format . If you open the A module auto format, we should open the module check rule. `ImportOrder` rule can't custom in each module(It's going to take a lot of changes,maybe should add `build.gradle` every module). So i add this line in order that open rule some module in future,This is what I think is a more convenient way to modify by module -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
gongxuanzhang commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621586240 ## checkstyle/suppressions.xml: ## @@ -361,4 +361,7 @@ + + Review Comment: I think checkstyle should consistent with auto format . If you open the A module auto format, we should open the module check rule. `ImportOrder` rule can't custom in each module(It's going to take a lot of changes,maybe should add build.gradle every module). So i add this line in order that open rule some module in future,This is what I think is a more convenient way to modify by module -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
gongxuanzhang commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621614857 ## build.gradle: ## @@ -787,6 +800,12 @@ subprojects { skipProjects = [ ":jmh-benchmarks", ":trogdor" ] skipConfigurations = [ "zinc" ] } + + afterEvaluate { Review Comment: I change the PR,please review it @chia7712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16861) Don't convert to group to classic if the size is larger than group max size
[ https://issues.apache.org/jira/browse/KAFKA-16861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850899#comment-17850899 ] TengYao Chi commented on KAFKA-16861: - I will handle this issue :) > Don't convert to group to classic if the size is larger than group max size > --- > > Key: KAFKA-16861 > URL: https://issues.apache.org/jira/browse/KAFKA-16861 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > It should be one-line fix [0] > [0] > https://github.com/apache/kafka/blob/2d9994e0de915037525f041ff9a9b9325f838938/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L810 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621587957 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java: ## @@ -585,34 +586,34 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith )); // Initial subscriptions were [T1, T2] -Map members = new HashMap<>(); +Map members = new TreeMap<>(); +Map>> assignedPartitions = new HashMap<>(); Map> currentAssignmentForA = mkAssignment( mkTopicAssignment(topic1Uuid, 0, 2), mkTopicAssignment(topic2Uuid, 1, 3) ); -members.put(memberA, new AssignmentMemberSpec( +assignedPartitions.put(memberA, currentAssignmentForA); +members.put(memberA, new MemberSubscriptionSpecImpl( Optional.empty(), -Optional.empty(), -Collections.singleton(topic1Uuid), -currentAssignmentForA +Collections.singleton(topic1Uuid) )); Map> currentAssignmentForB = mkAssignment( mkTopicAssignment(topic1Uuid, 1), mkTopicAssignment(topic2Uuid, 0, 2, 4) ); -members.put(memberB, new AssignmentMemberSpec( -Optional.empty(), +assignedPartitions.put(memberB, currentAssignmentForB); +members.put(memberB, new MemberSubscriptionSpecImpl( Optional.empty(), -mkSet(topic1Uuid, topic2Uuid), -currentAssignmentForB +new HashSet<>(Arrays.asList(topic1Uuid, topic2Uuid)) )); GroupSpec groupSpec = new GroupSpecImpl( members, HETEROGENEOUS, -invertedTargetAssignment(members) +assignedPartitions, +invertedTargetAssignment(assignedPartitions, members) ); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Review Comment: yep sounds good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
gongxuanzhang commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621586240 ## checkstyle/suppressions.xml: ## @@ -361,4 +361,7 @@ + + Review Comment: I think checkstyle should consistent with auto format . If you open the A module auto format, we should open the module check rule. So i add this line in order that open rule some module in future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
gongxuanzhang commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621584578 ## build.gradle: ## @@ -47,7 +47,7 @@ plugins { // Updating the shadow plugin version to 8.1.1 causes issue with signing and publishing the shadowed // artifacts - see https://github.com/johnrengelman/shadow/issues/901 id 'com.github.johnrengelman.shadow' version '8.1.0' apply false - id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer require Java 11 at compile time, so we can't upgrade until AK 4.0 + id 'com.diffplug.spotless' version "${spotlessVersion}" apply false Review Comment: i write in `gradle.properties` comment, because after 6.13.0 require Java 11 , we should compatibility java 8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
satishd commented on PR #15820: URL: https://github.com/apache/kafka/pull/15820#issuecomment-2141069773 @abhijeetk88 Can you resolve the conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16866 RemoteLogManagerTest.testCopyQuotaManagerConfig failing [kafka]
satishd merged PR #16146: URL: https://github.com/apache/kafka/pull/16146 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16866 RemoteLogManagerTest.testCopyQuotaManagerConfig failing [kafka]
satishd commented on PR #16146: URL: https://github.com/apache/kafka/pull/16146#issuecomment-2141065172 A few failing tests are unrelated to the change. Merging this change to the trunk to unblock the test failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
jolshan commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1621535872 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, Map partitionsWithUpdatedLeaderInfo = new HashMap<>(); produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { -TopicPartition tp = new TopicPartition(r.name(), p.index()); +// Version 12 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name. +String topicName = (r.name() == null || r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name(); Review Comment: Yes. For the fetch request for example, there is code to make sure that all topics have IDs before we can send the fetch request. This is a bit less of an issue now, but if we have a cluster that is running on a MV < 2.8, topics will not have IDs. So when we decide which version of produce we want to send, we want to be aware of this. Not only that, but even if the broker supports topic IDs on all topics, we also may have a case where we need to do a rolling upgrade to get the code that supports handling the latest API version. This may be less complicated for Produce since it is a client only API and doesn't rely on MV/IBP, so the apiVersions exchange between the client and the broker may be enough to ensure api compatibility. We just want to confirm these upgrade paths are compatible since produce is the hot path and we don't want any (or at least not extended) downtime in the middle of an upgrade. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
jolshan commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1621535872 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, Map partitionsWithUpdatedLeaderInfo = new HashMap<>(); produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { -TopicPartition tp = new TopicPartition(r.name(), p.index()); +// Version 12 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name. +String topicName = (r.name() == null || r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name(); Review Comment: Yes. For the fetch request for example, there is code to make sure that all topics have IDs before we can send the fetch request. This is a bit less of an issue now, but if we have a cluster that is running on a MV < 2.8, not all topics will have IDs. So when we decide which version of produce we want to send, we want to be aware of this. Not only that, but even if the broker supports topic IDs on all topics, we also may have a case where we need to do a rolling upgrade to get the code that supports handling the latest API version. This may be less complicated for Produce since it is a client only API and doesn't rely on MV/IBP, so the apiVersions exchange between the client and the broker may be enough to ensure api compatibility. We just want to confirm these upgrade paths are compatible since produce is the hot path and we don't want any (or at least not extended) downtime in the middle of an upgrade. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
ableegoldman commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1621532246 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final Set topicPa */ private static boolean canPerformRackAwareOptimization(final ApplicationState applicationState, final AssignedTask.Type taskType) { -final String rackAwareAssignmentStrategy = applicationState.assignmentConfigs().rackAwareAssignmentStrategy(); +final AssignmentConfigs assignmentConfigs = applicationState.assignmentConfigs(); +final String rackAwareAssignmentStrategy = assignmentConfigs.rackAwareAssignmentStrategy(); if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy)) { +LOG.warn("Rack aware task assignment optimization disabled: rack aware strategy was set to {}", +rackAwareAssignmentStrategy); +return false; +} + +if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) { +LOG.warn("Rack aware task assignment optimization unavailable: the traffic cost configuration was not set."); Review Comment: We should log the exact config name since otherwise people won't necessarily know what this is referring to (especially since they already forgot to set this config). ```suggestion LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG); ``` ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final Set topicPa */ private static boolean canPerformRackAwareOptimization(final ApplicationState applicationState, final AssignedTask.Type taskType) { -final String rackAwareAssignmentStrategy = applicationState.assignmentConfigs().rackAwareAssignmentStrategy(); +final AssignmentConfigs assignmentConfigs = applicationState.assignmentConfigs(); +final String rackAwareAssignmentStrategy = assignmentConfigs.rackAwareAssignmentStrategy(); if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy)) { +LOG.warn("Rack aware task assignment optimization disabled: rack aware strategy was set to {}", +rackAwareAssignmentStrategy); +return false; +} + +if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) { +LOG.warn("Rack aware task assignment optimization unavailable: the traffic cost configuration was not set."); return false; } + +if (!assignmentConfigs.rackAwareNonOverlapCost().isPresent()) { +LOG.warn("Rack aware task assignment optimization unavailable: the non-overlap cost configuration was not set."); Review Comment: ```suggestion LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG); ``` ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ## @@ -40,8 +41,8 @@ public AssignmentConfigs(final StreamsConfig configs) { configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG), configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG), + Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)), + Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)), Review Comment: don't we need to check `if (assignorClassName.equals("org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor"))` and set these to the sticky assignor defaults if true? Where `assignorClassName` is equal to `streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG)` -- I guess maybe we do want the public `AssignmentConfigs` constructor to take in the StreamsConfig after all? ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final Set topicPa */ private static boolean canPerformRackAwareOptimization(final ApplicationState applicationState,
Re: [PR] MINOR: Add more unit tests to LogSegments [kafka]
showuon commented on code in PR #16085: URL: https://github.com/apache/kafka/pull/16085#discussion_r1621524702 ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -4251,6 +4251,22 @@ class UnifiedLogTest { assertEquals(new LogOffsetMetadata(14, -1L, -1), log.maybeConvertToOffsetMetadata(14)) } + @Test + def testGetFirstBatchTimestampForSegments(): Unit = { +val log = createLog(logDir, LogTestUtils.createLogConfig()) + +val segments: java.util.List[LogSegment] = new java.util.ArrayList[LogSegment]() +val seg1 = LogTestUtils.createSegment(1, logDir, 10, Time.SYSTEM) +val seg2 = LogTestUtils.createSegment(2, logDir, 10, Time.SYSTEM) Review Comment: Forgot to close them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621518800 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java: ## @@ -18,48 +18,60 @@ import org.apache.kafka.common.Uuid; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.Set; /** * The assignment specification for a consumer group. */ public class GroupSpecImpl implements GroupSpec { /** - * The member metadata keyed by member Id. + * Member subscription metadata keyed by member Id. */ -private final Map members; +private final Map memberSubscriptions; /** - * The subscription type followed by the group. + * The subscription type of the group. */ private final SubscriptionType subscriptionType; +/** + * Partitions currently assigned to each member keyed by topicId. + */ +private final Map>> currentAssignment; Review Comment: I used memberAssigment and invertedMemberAssignment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621508931 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java: ## @@ -39,4 +41,20 @@ public interface GroupSpec { * False, otherwise. */ boolean isPartitionAssigned(Uuid topicId, int partitionId); + +/** + * Gets the member subscription specification for a member. + * + * @param memberId The member Id. + * @return The member's subscription metadata. + */ +MemberSubscriptionSpec memberSubscriptionSpec(String memberId); Review Comment: discussed offline, we want to throw an IllegalStateException when the memberId is not found -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
jolshan commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1621497961 ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -159,9 +158,7 @@ default Set supportedGroupProtocols() { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); -// KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") || Review Comment: Mostly confused because we don't check for the config in kafka apis anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
jolshan commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1621496853 ## metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java: ## @@ -74,7 +74,7 @@ public void testDefaultFeatureMapWithUnstable() { for (Features feature : Features.PRODUCTION_FEATURES) { expectedFeatures.put(feature.featureName(), VersionRange.of( 0, -feature.defaultValue(MetadataVersion.LATEST_PRODUCTION) +feature.defaultValue(MetadataVersion.latestTesting()) Review Comment: Hmm was this just a bug in the test... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
pasharik commented on code in PR #15830: URL: https://github.com/apache/kafka/pull/15830#discussion_r1621488481 ## core/src/main/scala/kafka/admin/AclCommand.scala: ## @@ -115,8 +115,6 @@ object AclCommand extends Logging { val aclBindings = acls.map(acl => new AclBinding(resource, acl)).asJavaCollection adminClient.createAcls(aclBindings).all().get() } - -listAcls(adminClient) Review Comment: - I've moved KRaft tests into a new `AclCommandIntegrationTest.java` - Left old Zookeeper tests in `AclCommandTest.scala`. As I understand, we are going to completely delete this test file, once fully moved to KRaft, am I right? Do you think it's worth migrating those tests to java at this stage, if they are going to be deleted anyway? - Race condition described above, is still reproduced on a new infrastructure :cry: So if there are no objections, we can probably remove console output `Current ACLs` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
jolshan commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1621487824 ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -159,9 +158,7 @@ default Set supportedGroupProtocols() { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); -// KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") || Review Comment: do we plan to remove this config value from GroupCoordinatorConfig? I see it was removed from a lot of files, but there are still a few where it is used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]
soarez commented on PR #15945: URL: https://github.com/apache/kafka/pull/15945#issuecomment-2140953114 There are some conflicts that need addressing, and the JDK 21 pipeline didn't run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]
soarez commented on code in PR #15945: URL: https://github.com/apache/kafka/pull/15945#discussion_r1621485662 ## metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Tracks the registration of a specific broker, and executes a callback if it should be refreshed. + * + * This tracker handles cases where we might want to re-register the broker. The only such case + * right now is during the transition from non-JBOD mode, to JBOD mode. In other words, the + * transition from a MetadataVersion less than 3.7-IV2, to one greater than or equal to 3.7-IV2. + * In this case, the broker registration will start out containing no directories, and we need to + * resend the BrokerRegistrationRequest to fix that. + * + * As much as possible, the goal here is to keep things simple. We just compare the desired state + * with the actual state, and try to make changes only if necessary. + */ +public class BrokerRegistrationTracker implements MetadataPublisher { +private final Logger log; +private final int id; +private final Runnable refreshRegistrationCallback; + +/** + * Create the tracker. + * + * @param idThe ID of this broker. + * @param targetDirectories The directories managed by this broker. + * @param refreshRegistrationCallback Callback to run if we need to refresh the registration. + */ +public BrokerRegistrationTracker( +int id, +List targetDirectories, +Runnable refreshRegistrationCallback +) { +this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] "). +logger(BrokerRegistrationTracker.class); +this.id = id; +this.refreshRegistrationCallback = refreshRegistrationCallback; +} + +@Override +public String name() { +return "BrokerRegistrationTracker(id=" + id + ")"; +} + +@Override +public void onMetadataUpdate( +MetadataDelta delta, +MetadataImage newImage, +LoaderManifest manifest +) { +boolean checkBrokerRegistration = false; +if (delta.featuresDelta() != null) { +if (delta.metadataVersionChanged().isPresent()) { +if (log.isTraceEnabled()) { +log.trace("Metadata version change is present: {}", +delta.metadataVersionChanged()); +} +checkBrokerRegistration = true; +} +} +if (delta.clusterDelta() != null) { +if (delta.clusterDelta().changedBrokers().get(id) != null) { +if (log.isTraceEnabled()) { +log.trace("Broker change is present: {}", +delta.clusterDelta().changedBrokers().get(id)); +} +checkBrokerRegistration = true; +} +} +if (checkBrokerRegistration) { +if (brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(), +delta.clusterDelta().broker(id))) { +refreshRegistrationCallback.run(); +} +} +} + +/** + * Check if the current broker registration needs to be refreshed. + * + * @param registration The current broker registration, or null if there is none. + * @return True only if we should refresh. + */ +boolean brokerRegistrationNeedsRefresh( +MetadataVersion metadataVersion, +BrokerRegistration registration +) { +// If there is no existing registration, the BrokerLifecycleManager must still be sending it. +// So we don't
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) Implement #defaultStandbyTaskAssignment and finish rack-aware standby optimization [kafka]
apourchet commented on code in PR #16129: URL: https://github.com/apache/kafka/pull/16129#discussion_r1621479244 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -244,18 +271,112 @@ public static Map optimizeRackAwareStandbyTas ); LOG.info("Assignment before standby task optimization has cost {}", initialCost); -throw new UnsupportedOperationException("Not yet Implemented."); +final MoveStandbyTaskPredicate moveablePredicate = getStandbyTaskMovePredicate(applicationState); +final BiFunction> getMovableTasks = (source, destination) -> { +return source.tasks().values().stream() +.filter(task -> task.type() == AssignedTask.Type.STANDBY) +.filter(task -> !destination.tasks().containsKey(task.id())) +.filter(task -> { +final KafkaStreamsState sourceState = kafkaStreamsStates.get(source.processId()); +final KafkaStreamsState destinationState = kafkaStreamsStates.get(source.processId()); +return moveablePredicate.canMoveStandbyTask(sourceState, destinationState, task.id(), kafkaStreamsAssignments); +}) +.map(AssignedTask::id) +.sorted() +.collect(Collectors.toList()); +}; + +final long startTime = System.currentTimeMillis(); +boolean taskMoved = true; +int round = 0; +final RackAwareGraphConstructor graphConstructor = RackAwareGraphConstructorFactory.create( + applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds); +while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) { +taskMoved = false; +round++; +for (int i = 0; i < kafkaStreamsAssignments.size(); i++) { +final UUID clientId1 = clientIds.get(i); +final KafkaStreamsAssignment clientState1 = kafkaStreamsAssignments.get(new ProcessId(clientId1)); +for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) { +final UUID clientId2 = clientIds.get(i); +final KafkaStreamsAssignment clientState2 = kafkaStreamsAssignments.get(new ProcessId(clientId2)); + +final String rack1 = clientRacks.get(clientState1.processId().id()).get(); +final String rack2 = clientRacks.get(clientState2.processId().id()).get(); +// Cross rack traffic can not be reduced if racks are the same +if (rack1.equals(rack2)) { +continue; +} + +final List movable1 = getMovableTasks.apply(clientState1, clientState2); +final List movable2 = getMovableTasks.apply(clientState2, clientState1); + +// There's no needed to optimize if one is empty because the optimization +// can only swap tasks to keep the client's load balanced +if (movable1.isEmpty() || movable2.isEmpty()) { +continue; +} + +final List taskIdList = Stream.concat(movable1.stream(), movable2.stream()) +.sorted() +.collect(Collectors.toList()); +final List clients = Stream.of(clientId1, clientId2).sorted().collect(Collectors.toList()); + +final AssignmentGraph assignmentGraph = buildTaskGraph( +assignmentsByUuid, +clientRacks, +taskIdList, +clients, +topicPartitionsByTaskId, +crossRackTrafficCost, +nonOverlapCost, +false, +false, Review Comment: you're right, good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
apourchet opened a new pull request, #16147: URL: https://github.com/apache/kafka/pull/16147 This PR takes care of making the call back to`TaskAssignor.onAssignmentComputed`. It also contains a change to the public AssignmentConfigs API, as well as some simplifications of the StickyTaskAssignor. This PR also changes the rack information fetching to happen lazily in the case where the TaskAssignor makes its decisions without said rack information. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) Implement #defaultStandbyTaskAssignment and finish rack-aware standby optimization [kafka]
ableegoldman merged PR #16129: URL: https://github.com/apache/kafka/pull/16129 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]
ableegoldman commented on PR #16129: URL: https://github.com/apache/kafka/pull/16129#issuecomment-2140934597 Test failures are unrelated. Merging to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]
ableegoldman commented on code in PR #16129: URL: https://github.com/apache/kafka/pull/16129#discussion_r1621467009 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -555,18 +556,21 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe RackUtils.annotateTopicPartitionsWithRackInfo(cluster, internalTopicManager, allTopicPartitions); -final Set logicalTasks = logicalTaskIds.stream().map(taskId -> { -final Set stateStoreNames = topologyMetadata -.stateStoreNameToSourceTopicsForTopology(taskId.topologyName()) -.keySet(); -final Set topicPartitions = topicPartitionsForTask.get(taskId); -return new DefaultTaskInfo( -taskId, -!stateStoreNames.isEmpty(), -stateStoreNames, -topicPartitions -); -}).collect(Collectors.toSet()); +final Map logicalTasks = logicalTaskIds.stream().collect(Collectors.toMap( +Function.identity(), +taskId -> { +final Set stateStoreNames = topologyMetadata + .stateStoreNameToSourceTopicsForTopology(taskId.topologyName()) Review Comment: Ah somehow I missed this before -- this is actually returning _all_ the state stores for this topology, it's not specific to the taskId. This was an existing issue so we don't need to fix it in this PR, it can be addressed in a followup. It might be a bit complicated so I'll take a look at how we can get this info Would've caught this during testing since we definitely want tests with mixed stateless-and-stateful tasks, but still good to fix ASAP ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -72,6 +80,27 @@ public static Map identityAssignment(final Ap return assignments; } +/** + * Assign standby tasks to KafkaStreams clients according to the default logic. + * + * If rack-aware client tags are configured, the rack-aware standby task assignor will be used + * + * @param applicationStatethe metadata and other info describing the current application state + * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients + * + * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignment + */ +public static Map defaultStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { +if (!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) { +return tagBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments); +} else if (canPerformRackAwareOptimization(applicationState, AssignedTask.Type.STANDBY)) { +return tagBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments); Review Comment: Address in a followup: We should just remove this case entirely right? basically it's "if hasRackAwareTags then do tag-based standby task assignment, if doesNotHaveTags then do default standby task assignment" Note that the tag-based rack aware assignment has nothing to do with the rack ids. So `canPerformRackAwareOptimization` is kind of irrelevant to the question here ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final TaskInfo task, } return true; } + +private static Map tagBasedStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { +final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); +final Map tasksToRemainingStandbys = applicationState.allTasks().values().stream() +.collect(Collectors.toMap(TaskInfo::id, taskInfo -> numStandbyReplicas)); +final Map streamStates = applicationState.kafkaStreamsStates(false); + +final Set rackAwareAssignmentTags = new HashSet<>(getRackAwareAssignmentTags(applicationState)); +final TagStatistics tagStatistics = new TagStatistics(applicationState); + +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments); + +final Set statefulTaskIds = applicationState.allTasks().values().stream() +.filter(TaskInfo::isStateful) +.map(TaskInfo::id) +.collect(Collectors.toSet()); +final Map clientsByUuid =
[jira] [Commented] (KAFKA-16863) Consider removing `default.` prefix for exception handler config
[ https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850869#comment-17850869 ] Matthias J. Sax commented on KAFKA-16863: - I think `traffic_cost` was on purpose... But I really don't feel strong about it at all. In general, I am always in favor to cleanup stuff; we also just did KIP-1020. Not sure if we should do a single KIP though. I can become very convoluted quickly. I would rather to multiple smaller KIPs? Not sure what other issue there might be? Do you have a list? > Consider removing `default.` prefix for exception handler config > > > Key: KAFKA-16863 > URL: https://issues.apache.org/jira/browse/KAFKA-16863 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Trivial > Labels: need-kip > > Kafka Streams has a set of configs with `default.` prefix. The intent for the > default-prefix is to make a distinction between, well the default, and > in-place overwrites in the code. Eg, users can specify ts-extractors on a > per-topic basis. > However, for the deserialization- and production-exception handlers, no such > overwrites are possible, and thus, `default.` does not really make sense, > because there is just one handler overall. Via KIP-1033 we added a new > processing-exception handler w/o a default-prefix, too. > Thus, we should consider to deprecate the two existing configs names and add > them back w/o the `default.` prefix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
junrao commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140919323 @jolshan : Thanks for pointing this out. Sorry that I didn't look at the test results carefully before merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]
jolshan merged PR #16130: URL: https://github.com/apache/kafka/pull/16130 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]
jolshan commented on code in PR #16130: URL: https://github.com/apache/kafka/pull/16130#discussion_r1621463208 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -449,8 +449,8 @@ object KafkaConfig { /** Internal Configurations **/ // This indicates whether unreleased APIs should be advertised by this node. .defineInternal(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) - // This indicates whether unreleased MetadataVersions should be enabled on this node. - .defineInternal(ServerConfigs.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) + // This indicates whether unreleased MetadataVersions or other feature versions should be enabled on this node. + .defineInternal(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) Review Comment: Yup -- this is the text in the KIP: > Add INTERNAL configuration unstable.feature.versions.enable to allow for non production ready features to be used (for testing) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]
jolshan commented on PR #16130: URL: https://github.com/apache/kafka/pull/16130#issuecomment-2140914372 I filed https://issues.apache.org/jira/browse/KAFKA-16866 for the one failure and that is getting fixed separately. As for the others, looks like they are frequent flakes. I will go ahead and merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850215#comment-17850215 ] Kirk True edited comment on KAFKA-16792 at 5/30/24 9:41 PM: These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) The following still don't work: - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}} - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs - testResetToCommittedOffset was (Author: kirktrue): These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) The following still don't work: - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFESTS}} - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs - testResetToCommittedOffset > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testFetchStableOffsetThrowInPoll > - testCurrentLag > - testListOffsetShouldUpdateSubscriptions > - testResetToCommittedOffset > - testResetUsingAutoResetPolicy > - testPollReturnsRecords > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850215#comment-17850215 ] Kirk True edited comment on KAFKA-16792 at 5/30/24 9:40 PM: These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) The following still don't work: - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFESTS}} - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs - testResetToCommittedOffset was (Author: kirktrue): These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) The following still don't work: - testCurrentLag - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs - testResetToCommittedOffset > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testFetchStableOffsetThrowInPoll > - testCurrentLag > - testListOffsetShouldUpdateSubscriptions > - testResetToCommittedOffset > - testResetUsingAutoResetPolicy > - testPollReturnsRecords > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850215#comment-17850215 ] Kirk True edited comment on KAFKA-16792 at 5/30/24 9:34 PM: These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) The following still don't work: - testCurrentLag - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs - testResetToCommittedOffset was (Author: kirktrue): These tests work with KAFKA-16200: - testResetUsingAutoResetPolicy - testFetchStableOffsetThrowInCommitted (wasn't in the above list) The following still don't work: - testCurrentLag - testFetchStableOffsetThrowInPoll - testListOffsetShouldUpdateSubscriptions - testPollReturnsRecords - testResetToCommittedOffset > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testFetchStableOffsetThrowInPoll > - testCurrentLag > - testListOffsetShouldUpdateSubscriptions > - testResetToCommittedOffset > - testResetUsingAutoResetPolicy > - testPollReturnsRecords > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621445974 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment =
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621442335 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java: ## @@ -66,21 +66,19 @@ public GroupAssignment assign( GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber ) throws PartitionAssignorException { -AbstractUniformAssignmentBuilder assignmentBuilder; - if (groupSpec.members().isEmpty()) return new GroupAssignment(Collections.emptyMap()); if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the " + "optimized assignment algorithm"); -assignmentBuilder = new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber); +return new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber) +.build(); Review Comment: any reason why we changed the name to not match the general assignor? Or is this also changed in the original that renamed the files? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16858) Flatten SMT throws NPE
[ https://issues.apache.org/jira/browse/KAFKA-16858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Strickland updated KAFKA-16858: Attachment: proto.proto > Flatten SMT throws NPE > -- > > Key: KAFKA-16858 > URL: https://issues.apache.org/jira/browse/KAFKA-16858 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.6.0 > Environment: Kafka 3.6 by way of CP 7.6.0 >Reporter: Adam Strickland >Priority: Major > Attachments: FlattenTest.java, proto.proto > > > {{ConnectSchema.expectedClassesFor}} sometimes will throw an NPE as part of a > call to an SMT chain. Stack trace snippet: > {{at > com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.apply(MomentFlatten.java:84)}} > {{at > com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.applyWithSchema(MomentFlatten.java:173)}} > {{at > com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}} > {{at > com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}} > {{at > com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}} > {{at > com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:274)}} > {{at org.apache.kafka.connect.data.Struct.put(Struct.java:203)}} > {{at org.apache.kafka.connect.data.Struct.put(Struct.java:216)}} > {{at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)}} > {{at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)}} > {{at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:224)}} > {{at > org.apache.kafka.connect.data.ConnectSchema.expectedClassesFor(ConnectSchema.java:268)}} > (the above transform is a sub-class of > {{{}o.a.k.connect.transforms.Flatten{}}}; have confirmed that the error > occurs regardless). > The field being transformed is an array of structs. If the call to > {{Schema#valueSchema()}} (o.a.k.connect.data.ConnectSchema.java:255) returns > {{{}null{}}}, the subsequent call to {{Schema#name()}} at > o.a.k.connect.data.ConnectSchema:268 throws an NPE. > The strange thing that we have observed is that this doesn't always happen; > *sometimes* the struct's schema is found and sometimes it is not. We have > been unable to determine the root cause, but have constructed a test that > replicates the problem as observed (see attachment). > In our case we have worked around the issue with the aforementioned sub-class > of {{{}Flatten{}}}, catching and logging the NPE on that specific use-case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16858) Flatten SMT throws NPE
[ https://issues.apache.org/jira/browse/KAFKA-16858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850866#comment-17850866 ] Adam Strickland commented on KAFKA-16858: - * {quote}Are you able to provide an anonymized form of your schema directly, rather than just a high-level "Array of Structs"? I'm wondering if your schema is capable of triggering the use of the mutable SchemaWrapper [https://github.com/confluentinc/schema-registry/blob/7b886f309c83041d4f2a5b41b5910f3b8002413a/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java#L1779] inside the ProtobufConverter.{quote} Yes; see [^proto.proto] * {quote}w.r.t. the variable validateValue depth: Are you saying that in _error cases_ the recursion depth is unpredictable, or in general? The validateValue should be called at every or almost every location in the tree of values, so I would expect to see lots of different recursion depths. Maybe you can share some more stacktraces as examples.{quote} We were only looking at error cases. I specifically recall seeing the same message break on the 3rd recursion and on the 7th; the array for the message in question contained 2 Structs. * {quote}So far in this investigation, I'm trying to find the source of the null in hopes that we can prevent it, and get well-formed data to the Flatten SMT. Regardless of the result of that investigation, I think we can consider this input malformed, and throw an intentional DataException instead of NullPointerException. Would that be an acceptable solution for you, or does this data need to make it all the way through the pipeline?{quote} Throwing a DataException would be preferable to the NPE. I'm not sure I understand what you mean by "need to make it all the way through the pipeline"... As it stands the particular problematic attribute is not something we care about right now, which is why we can simply squash the Exception. > Flatten SMT throws NPE > -- > > Key: KAFKA-16858 > URL: https://issues.apache.org/jira/browse/KAFKA-16858 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.6.0 > Environment: Kafka 3.6 by way of CP 7.6.0 >Reporter: Adam Strickland >Priority: Major > Attachments: FlattenTest.java, proto.proto > > > {{ConnectSchema.expectedClassesFor}} sometimes will throw an NPE as part of a > call to an SMT chain. Stack trace snippet: > {{at > com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.apply(MomentFlatten.java:84)}} > {{at > com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.applyWithSchema(MomentFlatten.java:173)}} > {{at > com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}} > {{at > com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}} > {{at > com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}} > {{at > com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:274)}} > {{at org.apache.kafka.connect.data.Struct.put(Struct.java:203)}} > {{at org.apache.kafka.connect.data.Struct.put(Struct.java:216)}} > {{at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)}} > {{at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)}} > {{at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:224)}} > {{at > org.apache.kafka.connect.data.ConnectSchema.expectedClassesFor(ConnectSchema.java:268)}} > (the above transform is a sub-class of > {{{}o.a.k.connect.transforms.Flatten{}}}; have confirmed that the error > occurs regardless). > The field being transformed is an array of structs. If the call to > {{Schema#valueSchema()}} (o.a.k.connect.data.ConnectSchema.java:255) returns > {{{}null{}}}, the subsequent call to {{Schema#name()}} at > o.a.k.connect.data.ConnectSchema:268 throws an NPE. > The strange thing that we have observed is that this doesn't always happen; > *sometimes* the struct's schema is found and sometimes it is not. We have > been unable to determine the root cause, but have constructed a test that > replicates the problem as observed (see attachment). > In our case we have worked around the issue with the aforementioned sub-class > of {{{}Flatten{}}}, catching and logging the NPE on that specific use-case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector
[ https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850864#comment-17850864 ] Edoardo Comar commented on KAFKA-16047: --- [~gharris1727] I see that for a common produce request, ReplicaManager.appendRecords uses the timeout set as {color:#00}ProducerConfig{color}.{color:#871094}REQUEST_TIMEOUT_MS_CONFIG{color} i.e. {color:#00}CommonClientConfigs.{color}{color:#871094}REQUEST_TIMEOUT_MS_CONFIG{color} would it not make sense to always use that timeout when appending to a log ? BTW, MirrorMaker2 is unusable in a connect cluster set up with exactly once, when the broker server.properties are changed from the settings used in development testing and present in the config properties files. {color:#ff}transaction.state.log.replication.factor{color}{color:#00}=1{color} {color:#ff}transaction.state.log.min.isr{color}{color:#00}=1{color} [~akaltsikis] I see that you could not progress your PR. Are you happy to hand over this bugfix? > Source connector with EOS enabled have some InitProducerId requests timing > out, effectively failing all the tasks & the whole connector > --- > > Key: KAFKA-16047 > URL: https://issues.apache.org/jira/browse/KAFKA-16047 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, > 3.6.1 >Reporter: Angelos Kaltsikis >Assignee: Edoardo Comar >Priority: Major > > Source Connectors with 'exactly.once.support = required' may have some of > their tasks that issue InitProducerId requests from the admin client timeout. > In the case of MirrorSourceConnector, which was the source connector that i > found the bug, the bug was effectively making all the tasks (in the specific > case of) become "FAILED". As soon as one of the tasks gets FAILED due to the > 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many > restarts i did to the connector/tasks, i couldn't get the > MirrorSourceConnector in a healthy RUNNING state again. > Due to the low timeout that has been [hard-coded in the > code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] > (1ms), there is a chance that the `InitProducerId` requests timeout in case > of "slower-than-expected" Kafka brokers (that do not process & respond to the > above request in <= 1ms). (feel free to read more information about the issue > in the "More Context" section below) > [~ChrisEgerton] I would appreciate it if you could respond to the following > questions > - How and why was the 1ms magic number for transaction timeout has to be > chosen? > - Is there any specific reason that it can be guaranteed that the > `InitProducerId` request can be processed in such a small time window? > - I have tried the above in multiple different Kafka clusters that are hosted > in different underlying datacenter hosts and i don't believe that those > brokers are "slow" for some reason. If you feel that the brokers are slower > than expected, i would appreciate any pointers on how could i find out what > is the bottleneck > h3. Temporary Mitigation > I have increased the timeout to 1000ms (randomly picked this number, just > wanted to give enough time to brokers to always complete those type of > requests). It fix can be found in my fork > https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f > > h3. Final solution > The temporary mitigation is not ideal, as it still randomly picks a timeout > for such an operation which may high enough but it's not ensured that it will > always be high enough. Shall we introduce something client configurable ? > At the same time, i was thinking whether it makes sense to introduce some > tests that simulate slower than the "blazing" fast mocked brokers that exist > in Unit Tests, so as to be able to catch this type of low timeouts that > potentially make some software features not usable. > h3. What is affected > The above bug exists in MirrorSourceConnector Tasks running in distributed > Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode > enabled (pre-requisite for the exactly.once.support to work). I believe this > should be true for other SourceConnectors as well (as the code-path that was > the one to blame is Connect specific & not MirrorMaker specific). > h3. More context & logs > *Connector Logs* > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call:
[PR] KAFKA-16866 RemoteLogManagerTest.testCopyQuotaManagerConfig failing [kafka]
chia7712 opened a new pull request, #16146: URL: https://github.com/apache/kafka/pull/16146 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16866) RemoteLogManagerTest.testCopyQuotaManagerConfig failing
[ https://issues.apache.org/jira/browse/KAFKA-16866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16866: -- Assignee: Chia-Ping Tsai > RemoteLogManagerTest.testCopyQuotaManagerConfig failing > --- > > Key: KAFKA-16866 > URL: https://issues.apache.org/jira/browse/KAFKA-16866 > Project: Kafka > Issue Type: Test >Affects Versions: 3.8.0 >Reporter: Justine Olshan >Assignee: Chia-Ping Tsai >Priority: Major > > Seems like this test introduced in > [https://github.com/apache/kafka/pull/15625] is failing consistently. > org.opentest4j.AssertionFailedError: > Expected :61 > Actual :11 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
chia7712 commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140866041 @jolshan I file https://github.com/apache/kafka/pull/16146 to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621424229 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -71,63 +69,54 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment */ private final Set subscribedTopicIds; -/** - * The number of members to receive an extra partition beyond the minimum quota. - * Minimum Quota = Total Partitions / Total Members - * Example: If there are 11 partitions to be distributed among 3 members, - * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. - */ -private int remainingMembersToGetAnExtraPartition; - /** * Members mapped to the remaining number of partitions needed to meet the minimum quota. - * Minimum quota = total partitions / total members. */ -private Map potentiallyUnfilledMembers; +private final List potentiallyUnfilledMembers; Review Comment: why do we call this potentiallyUnfilledMembers rather than unfilledMembers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16833) Cluster missing topicIds from equals and hashCode, PartitionInfo missing equals and hashCode
[ https://issues.apache.org/jira/browse/KAFKA-16833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16833: -- Assignee: Alyssa Huang > Cluster missing topicIds from equals and hashCode, PartitionInfo missing > equals and hashCode > > > Key: KAFKA-16833 > URL: https://issues.apache.org/jira/browse/KAFKA-16833 > Project: Kafka > Issue Type: Bug >Reporter: Alyssa Huang >Assignee: Alyssa Huang >Priority: Major > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16833) Cluster missing topicIds from equals and hashCode, PartitionInfo missing equals and hashCode
[ https://issues.apache.org/jira/browse/KAFKA-16833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16833. Fix Version/s: 3.8.0 Resolution: Fixed > Cluster missing topicIds from equals and hashCode, PartitionInfo missing > equals and hashCode > > > Key: KAFKA-16833 > URL: https://issues.apache.org/jira/browse/KAFKA-16833 > Project: Kafka > Issue Type: Bug >Reporter: Alyssa Huang >Priority: Major > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16833: Fixing PartitionInfo and Cluster equals and hashCode [kafka]
chia7712 merged PR #16062: URL: https://github.com/apache/kafka/pull/16062 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16833: Fixing PartitionInfo and Cluster equals and hashCode [kafka]
chia7712 commented on PR #16062: URL: https://github.com/apache/kafka/pull/16062#issuecomment-2140856964 The failed test is traced by https://issues.apache.org/jira/browse/KAFKA-16866 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621422471 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment =
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621421194 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment =
Re: [PR] [MINOR] Code Cleanup - Connect Module [kafka]
chia7712 merged PR #16066: URL: https://github.com/apache/kafka/pull/16066 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector
[ https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-16047: - Assignee: Edoardo Comar > Source connector with EOS enabled have some InitProducerId requests timing > out, effectively failing all the tasks & the whole connector > --- > > Key: KAFKA-16047 > URL: https://issues.apache.org/jira/browse/KAFKA-16047 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, > 3.6.1 >Reporter: Angelos Kaltsikis >Assignee: Edoardo Comar >Priority: Major > > Source Connectors with 'exactly.once.support = required' may have some of > their tasks that issue InitProducerId requests from the admin client timeout. > In the case of MirrorSourceConnector, which was the source connector that i > found the bug, the bug was effectively making all the tasks (in the specific > case of) become "FAILED". As soon as one of the tasks gets FAILED due to the > 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many > restarts i did to the connector/tasks, i couldn't get the > MirrorSourceConnector in a healthy RUNNING state again. > Due to the low timeout that has been [hard-coded in the > code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] > (1ms), there is a chance that the `InitProducerId` requests timeout in case > of "slower-than-expected" Kafka brokers (that do not process & respond to the > above request in <= 1ms). (feel free to read more information about the issue > in the "More Context" section below) > [~ChrisEgerton] I would appreciate it if you could respond to the following > questions > - How and why was the 1ms magic number for transaction timeout has to be > chosen? > - Is there any specific reason that it can be guaranteed that the > `InitProducerId` request can be processed in such a small time window? > - I have tried the above in multiple different Kafka clusters that are hosted > in different underlying datacenter hosts and i don't believe that those > brokers are "slow" for some reason. If you feel that the brokers are slower > than expected, i would appreciate any pointers on how could i find out what > is the bottleneck > h3. Temporary Mitigation > I have increased the timeout to 1000ms (randomly picked this number, just > wanted to give enough time to brokers to always complete those type of > requests). It fix can be found in my fork > https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f > > h3. Final solution > The temporary mitigation is not ideal, as it still randomly picks a timeout > for such an operation which may high enough but it's not ensured that it will > always be high enough. Shall we introduce something client configurable ? > At the same time, i was thinking whether it makes sense to introduce some > tests that simulate slower than the "blazing" fast mocked brokers that exist > in Unit Tests, so as to be able to catch this type of low timeouts that > potentially make some software features not usable. > h3. What is affected > The above bug exists in MirrorSourceConnector Tasks running in distributed > Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode > enabled (pre-requisite for the exactly.once.support to work). I believe this > should be true for other SourceConnectors as well (as the code-path that was > the one to blame is Connect specific & not MirrorMaker specific). > h3. More context & logs > *Connector Logs* > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: fenceProducer(api=INIT_PRODUCER_ID) > {code} > *Broker Logs* > {code:java} > [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=] Returning > COORDINATOR_NOT_AVAILABLE error code to client for > kafka-connect-uat-mm2-msc-20th-7's InitProducerId request > (kafka.coordinator.transaction.TransactionCoordinator) > [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: > TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for > TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, > lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed > due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), > aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the >
Re: [PR] KAFKA-15853: Move ZKConfigs related static method out of core and into ZKConfigs [kafka]
chia7712 commented on PR #16109: URL: https://github.com/apache/kafka/pull/16109#issuecomment-2140831052 @OmniaGM nice idea but we need to fix conflicts first :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]
dongnuo123 commented on code in PR #16145: URL: https://github.com/apache/kafka/pull/16145#discussion_r1621405276 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -325,24 +325,12 @@ private Group validateOffsetCommit( } Review Comment: I kind of forget why we wanted to check `GroupIdNotFoundException`. I feel the current implementation does support the classic protocol member -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]
dongnuo123 commented on code in PR #16145: URL: https://github.com/apache/kafka/pull/16145#discussion_r1621403173 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ## @@ -857,6 +857,7 @@ public void validateOffsetCommit( throw Errors.UNKNOWN_MEMBER_ID.exception(); } +// TODO: A temp marker. Will remove it when the pr is open. if (!isTransactional && isInState(COMPLETING_REBALANCE)) { Review Comment: Not sure why we only check `COMPLETING_REBALANCE` but not `PREPARING_REBALANCE`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adds a test case to test that an exception is thrown in invalid ports [kafka]
chia7712 merged PR #16112: URL: https://github.com/apache/kafka/pull/16112 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue closed pull request #16031: KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout URL: https://github.com/apache/kafka/pull/16031 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]
dongnuo123 opened a new pull request, #16145: URL: https://github.com/apache/kafka/pull/16145 During online migration, there could be ConsumerGroup that has members that uses the classic protocol. In the current implementation, `STALE_MEMBER_EPOCH` could be thrown in ConsumerGroup offset fetch/commit validation but it's not supported by the classic protocol. Thus this patch changed `ConsumerGroup#validateOffsetCommit` to ensure compatibility. There's no need to change `ConsumerGroup#validateOffsetFetch` because the member id and member epoch are always empty and -1 in the classic protocol, so the offset fetch request is always valid. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change KStreamKstreamOuterJoinTest to use distinct left and right types [kafka]
gharris1727 commented on code in PR #15513: URL: https://github.com/apache/kafka/pull/15513#discussion_r1621400369 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -737,12 +739,12 @@ public void runOuterJoin(final StreamJoined streamJoine inputTopic1.pipeInput(expectedKey, "C" + expectedKey); } processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "C0+a0", 0L), -new KeyValueTimestamp<>(0, "C0+b0", 0L), -new KeyValueTimestamp<>(1, "C1+a1", 0L), -new KeyValueTimestamp<>(1, "C1+b1", 0L), -new KeyValueTimestamp<>(2, "C2+b2", 0L), -new KeyValueTimestamp<>(3, "C3+b3", 0L) +new KeyValueTimestamp<>(0, "C0+0", 0L), +new KeyValueTimestamp<>(0, "C0+0", 0L), +new KeyValueTimestamp<>(1, "C1+1", 0L), +new KeyValueTimestamp<>(1, "C1+1", 0L), Review Comment: You're right, I didn't notice this. I did a search-and-replace renaming, and reverted the stuff which didn't make sense. I did have to manually renumber stuff like "a0-0", and some places where capital letters "A0" were used on the inputStream2 to fit the pattern better. PTAL, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1621395136 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -327,8 +339,11 @@ void testEnsureEventsAreCompleted() { assertTrue(applicationEventsQueue.isEmpty()); } +// Look into this one Review Comment: My mistake leaving that there, that was a comment for myself that I forgot to remove -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16866) RemoteLogManagerTest.testCopyQuotaManagerConfig failing
Justine Olshan created KAFKA-16866: -- Summary: RemoteLogManagerTest.testCopyQuotaManagerConfig failing Key: KAFKA-16866 URL: https://issues.apache.org/jira/browse/KAFKA-16866 Project: Kafka Issue Type: Test Affects Versions: 3.8.0 Reporter: Justine Olshan Seems like this test introduced in [https://github.com/apache/kafka/pull/15625] is failing consistently. org.opentest4j.AssertionFailedError: Expected :61 Actual :11 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1621386306 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -149,20 +157,28 @@ public void testStartupAndTearDown() throws InterruptedException { "The consumer network thread did not stop within " + DEFAULT_MAX_WAIT_MS + " ms"); } +@Test +void testRequestManagersArePolledOnce() { +consumerNetworkThread.runOnce(); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; +verify(networkClientDelegate).poll(anyLong(), anyLong()); +} + @Test public void testApplicationEvent() { ApplicationEvent e = new PollEvent(100); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +verify(applicationEventProcessor).process(e); Review Comment: I checked Mockito documentation and adding times(1) is redundant, so not really a big deal either way to keep it or remove it. Do let me know though if there is a stylistic preference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
jolshan commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140804438 Can we look at testCopyQuotaManagerConfig() – kafka.log.remote.RemoteLogManagerTest? It seems like it is failing pretty consistently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]
brenden20 commented on PR #16124: URL: https://github.com/apache/kafka/pull/16124#issuecomment-2140800759 @kirktrue thank you for the suggestions, I have implemented and pushed your suggestions. Let me know if everything looks good! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adds a test case to test that an exception is thrown in invalid ports [kafka]
chia7712 commented on PR #16112: URL: https://github.com/apache/kafka/pull/16112#issuecomment-2140786920 @ahmedryasser Thanks for your contribution. Could you please add "MINOR: " to your title? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]
cmccabe commented on code in PR #15945: URL: https://github.com/apache/kafka/pull/15945#discussion_r1621372420 ## metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Tracks the registration of a specific broker, and executes a callback if it should be refreshed. + * + * This tracker handles cases where we might want to re-register the broker. The only such case + * right now is during the transition from non-JBOD mode, to JBOD mode. In other words, the + * transition from a MetadataVersion less than 3.7-IV2, to one greater than or equal to 3.7-IV2. + * In this case, the broker registration will start out containing no directories, and we need to + * resend the BrokerRegistrationRequest to fix that. + * + * As much as possible, the goal here is to keep things simple. We just compare the desired state + * with the actual state, and try to make changes only if necessary. + */ +public class BrokerRegistrationTracker implements MetadataPublisher { +private final Logger log; +private final int id; +private final Runnable refreshRegistrationCallback; + +/** + * Create the tracker. + * + * @param idThe ID of this broker. + * @param targetDirectories The directories managed by this broker. + * @param refreshRegistrationCallback Callback to run if we need to refresh the registration. + */ +public BrokerRegistrationTracker( +int id, +List targetDirectories, +Runnable refreshRegistrationCallback +) { +this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] "). +logger(BrokerRegistrationTracker.class); +this.id = id; +this.refreshRegistrationCallback = refreshRegistrationCallback; +} + +@Override +public String name() { +return "BrokerRegistrationTracker(id=" + id + ")"; +} + +@Override +public void onMetadataUpdate( +MetadataDelta delta, +MetadataImage newImage, +LoaderManifest manifest +) { +boolean checkBrokerRegistration = false; +if (delta.featuresDelta() != null) { +if (delta.metadataVersionChanged().isPresent()) { +if (log.isTraceEnabled()) { +log.trace("Metadata version change is present: {}", +delta.metadataVersionChanged()); +} +checkBrokerRegistration = true; +} +} +if (delta.clusterDelta() != null) { +if (delta.clusterDelta().changedBrokers().get(id) != null) { +if (log.isTraceEnabled()) { +log.trace("Broker change is present: {}", +delta.clusterDelta().changedBrokers().get(id)); +} +checkBrokerRegistration = true; +} +} +if (checkBrokerRegistration) { +if (brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(), +delta.clusterDelta().broker(id))) { +refreshRegistrationCallback.run(); +} +} +} + +/** + * Check if the current broker registration needs to be refreshed. + * + * @param registration The current broker registration, or null if there is none. + * @return True only if we should refresh. + */ +boolean brokerRegistrationNeedsRefresh( +MetadataVersion metadataVersion, +BrokerRegistration registration +) { +// If there is no existing registration, the BrokerLifecycleManager must still be sending it. +// So we don't
Re: [PR] KAFKA-15853: Move configDef out of core [kafka]
chia7712 commented on PR #16116: URL: https://github.com/apache/kafka/pull/16116#issuecomment-2140785638 @OmniaGM Sorry that please fix the conflicts again :_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
chia7712 commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621368550 ## build.gradle: ## @@ -787,6 +800,12 @@ subprojects { skipProjects = [ ":jmh-benchmarks", ":trogdor" ] skipConfigurations = [ "zinc" ] } + + afterEvaluate { Review Comment: Maybe we can set spotless directly. For example: ``` if (project.name in spotlessApplyModules) { apply plugin: 'com.diffplug.spotless' spotless { java { importOrder('kafka', 'org.apache.kafka', 'com', 'net', 'org', 'java', 'javax', '', '\\#') removeUnusedImports() } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]
gharris1727 commented on code in PR #16095: URL: https://github.com/apache/kafka/pull/16095#discussion_r1621349710 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ## @@ -231,6 +231,12 @@ private synchronized void onFailure(Throwable t) { if (this.state == State.FAILED) return; +// Call stop() on the connector to release its resources. Connector +// could fail in the start() method, which is why we call stop() on +// INIT state as well. +if (this.state == State.STARTED || this.state == State.INIT) +connector.stop(); Review Comment: This is a potentially blocking call to the connector, and I don't think that's a good fit for this onFailure handler. This call would delay the statusListener call, which delays notifying the REST API of the FAILED status and updating the metrics. If it blocks indefinitely, the status and metrics are never updated. There is a connector.stop() call in doShutdown that could be changed to execute for the INIT and FAILED states. That would leave the resources allocated while the connector is waiting in the FAILED state, but would at least ensure they don't leak long-term. We may also change the control flow to make the transition to the FAILED state trigger doShutdown early, rather than having it wait() with all the resources still allocated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
chia7712 commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621358471 ## checkstyle/suppressions.xml: ## @@ -361,4 +361,7 @@ + + Review Comment: why we need this change? ## build.gradle: ## @@ -1007,7 +1026,7 @@ project(':core') { testImplementation libs.junitJupiter testImplementation libs.slf4jlog4j testImplementation libs.caffeine - Review Comment: please revert this change ## build.gradle: ## @@ -47,7 +47,7 @@ plugins { // Updating the shadow plugin version to 8.1.1 causes issue with signing and publishing the shadowed // artifacts - see https://github.com/johnrengelman/shadow/issues/901 id 'com.github.johnrengelman.shadow' version '8.1.0' apply false - id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer require Java 11 at compile time, so we can't upgrade until AK 4.0 + id 'com.diffplug.spotless' version "${spotlessVersion}" apply false Review Comment: Do we need this variable? Also, why not using latest version `6.25.0`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16518) Storage tool changes for KIP-853
[ https://issues.apache.org/jira/browse/KAFKA-16518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850851#comment-17850851 ] Muralidhar Basani commented on KAFKA-16518: --- [~jsancio] have one short question in the pr about constructor of VoterSet and VoterNode > Storage tool changes for KIP-853 > > > Key: KAFKA-16518 > URL: https://issues.apache.org/jira/browse/KAFKA-16518 > Project: Kafka > Issue Type: Sub-task > Components: tools >Reporter: José Armando García Sancio >Assignee: Muralidhar Basani >Priority: Major > Fix For: 3.8.0 > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes#KIP853:KRaftControllerMembershipChanges-kafka-storage -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Refactor DynamicConfig [kafka]
chia7712 commented on code in PR #16133: URL: https://github.com/apache/kafka/pull/16133#discussion_r1621350440 ## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ## @@ -675,7 +677,7 @@ object DynamicLogConfig { // Exclude message.format.version for now since we need to check that the version // is supported on all brokers in the cluster. @nowarn("cat=deprecation") - val ExcludedConfigs = Set(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG) Review Comment: Do we need this variable? Maybe we can remove it by following change. ```scala // Exclude message.format.version for now since we need to check that the version // is supported on all brokers in the cluster. @nowarn("cat=deprecation") val ReconfigurableConfigs = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG ``` ## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ## @@ -319,7 +321,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) { -val nonDynamic = configNames.filter(DynamicConfig.Broker.nonDynamicProps.contains) +val nonDynamic = configNames.filter(nonDynamicProps.contains) Review Comment: How about `configNames.intersect(nonDynamicProps)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
dajac commented on PR #16120: URL: https://github.com/apache/kafka/pull/16120#issuecomment-2140752725 The build does not seem to start… I am not sure why. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]
kirktrue commented on PR #16115: URL: https://github.com/apache/kafka/pull/16115#issuecomment-2140747685 @brenden20, as mentioned on another one of your PRs, there's a checkstyle violation here. You can run this command locally to avoid waiting for the CI infrastructure to catch it: ``` ./gradlew check -x test ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
dajac commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1621344907 ## server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum GroupVersion implements FeatureVersion { + +// Version 1 enables the classic rebalance protocol. This is the default +// behavior even if the feature flag is not set. +GV_1(1, MetadataVersion.IBP_3_8_IV0, Collections.emptyMap()), Review Comment: Updated to use the version 0 approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]
kirktrue commented on PR #16124: URL: https://github.com/apache/kafka/pull/16124#issuecomment-2140745523 The test failures are unrelated, FWIW. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
kirktrue commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1621342452 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -53,89 +39,111 @@ import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation; import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; +import static org.junit.jupiter.api.Assertions.*; Review Comment: ```suggestion import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -53,89 +39,111 @@ import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation; import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; Review Comment: Just bring these explicit imports back to make checkstyle stop complaining ```suggestion import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable transaction verification with new group coordinator in TransactionsTest [kafka]
dajac merged PR #16139: URL: https://github.com/apache/kafka/pull/16139 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
kirktrue commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2140741298 Looks like there are some checkstyle failures due to the use of wildcard imports. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
kirktrue commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1621334851 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -1290,4 +1290,4 @@ static class MemberInfo { this.memberEpoch = Optional.empty(); } } -} +} Review Comment: We should revert/fix this change as it's whitespace only. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -53,89 +39,111 @@ import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation; import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +LogContext logContext = new LogContext(); +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = new MockClient(time); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new
Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]
chia7712 commented on code in PR #15989: URL: https://github.com/apache/kafka/pull/15989#discussion_r1621334504 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java: ## @@ -1184,6 +1185,141 @@ public void testRestoreRestartRequestInconsistentState() { verify(configLog).stop(); } +@Test +public void testPutTaskConfigsZeroTasks() throws Exception { +when(configLog.partitionCount()).thenReturn(1); + +configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); +verifyConfigure(); +configStorage.start(); + +// Bootstrap as if we had already added the connector, but no tasks had been added yet +whiteBoxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList()); + +// Null before writing +ClusterConfigState configState = configStorage.snapshot(); +assertEquals(-1, configState.offset()); + +// Task configs should read to end, write to the log, read to end, write root. + doAnswer(expectReadToEnd(Collections.emptyMap())).when(configLog).readToEnd(); + +expectConvertWriteRead( +COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0), +"tasks", 0); // We have 0 tasks + +configStorage.putTaskConfigs("connector1", Collections.emptyList()); + +// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks +configUpdateListener.onTaskConfigUpdate(Collections.emptyList()); Review Comment: We need to use `verify` to make sure this method is called as expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]
kirktrue commented on code in PR #16124: URL: https://github.com/apache/kafka/pull/16124#discussion_r1621332051 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -478,6 +478,23 @@ public void resetTimer() { this.heartbeatTimer.reset(heartbeatIntervalMs); } +@Override +public String toStringBase() { +return super.toStringBase() + +", heartbeatTimer=" + heartbeatTimer + +", heartbeatIntervalMs=" + heartbeatIntervalMs; +} + +// Visible for testing +protected Timer heartbeatTimer() { +return this.heartbeatTimer; +} + +// Visible for testing +protected long heartbeatIntervalMs() { +return this.heartbeatIntervalMs; Review Comment: ```suggestion return heartbeatIntervalMs; ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -152,6 +152,34 @@ public void cleanup() { } } +@Test +public void testHeartBeatRequestStateToStringBase() { +final long retryBackoffMs = 100; +final long retryBackoffMaxMs = 1000; +LogContext logContext = new LogContext(); +HeartbeatRequestState heartbeatRequestState1 = new HeartbeatRequestState( Review Comment: Super nit: we can drop the `1` at the end of the variable name, right? ```suggestion HeartbeatRequestState heartbeatRequestState = new HeartbeatRequestState( ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -478,6 +478,23 @@ public void resetTimer() { this.heartbeatTimer.reset(heartbeatIntervalMs); } +@Override +public String toStringBase() { +return super.toStringBase() + +", heartbeatTimer=" + heartbeatTimer + +", heartbeatIntervalMs=" + heartbeatIntervalMs; +} + +// Visible for testing +protected Timer heartbeatTimer() { +return this.heartbeatTimer; Review Comment: ```suggestion return heartbeatTimer; ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -152,6 +152,34 @@ public void cleanup() { } } +@Test +public void testHeartBeatRequestStateToStringBase() { +final long retryBackoffMs = 100; +final long retryBackoffMaxMs = 1000; +LogContext logContext = new LogContext(); +HeartbeatRequestState heartbeatRequestState1 = new HeartbeatRequestState( +logContext, +time, +10, +retryBackoffMs, +retryBackoffMaxMs, +.2 +); + +RequestState requestState = new RequestState( +logContext, + "org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager$HeartbeatRequestState", Review Comment: Perhaps we could make `HeartbeatRequestState` package-protected, then we could do this: ```suggestion HeartbeatRequestManager.HeartbeatRequestState.class.getName(), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16802) Move build.gradle java version information inside of a java block
[ https://issues.apache.org/jira/browse/KAFKA-16802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-16802. - Fix Version/s: 3.8.0 Resolution: Fixed > Move build.gradle java version information inside of a java block > - > > Key: KAFKA-16802 > URL: https://issues.apache.org/jira/browse/KAFKA-16802 > Project: Kafka > Issue Type: Sub-task >Reporter: Greg Harris >Assignee: Muralidhar Basani >Priority: Major > Labels: newbie > Fix For: 3.8.0 > > > The org.gradle.api.plugins.JavaPluginConvention type has been deprecated. > This is scheduled to be removed in Gradle 9.0. > [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation] > > [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16802 : Moving java versions inside java block [kafka]
gharris1727 merged PR #16135: URL: https://github.com/apache/kafka/pull/16135 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16807: DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions [kafka]
chia7712 commented on PR #16042: URL: https://github.com/apache/kafka/pull/16042#issuecomment-2140664027 > The test names are testDescribeLogDirsWithoutAnyPartitionTopic and testDescribeLogDirs. It seems to me the method needs to come with the guarantee: `DescribeLogDirsTopic` should not have empty `partitions` Hence, could you please add following asserts to `testDescribeLogDirs` ```scala responses.foreach { response => assertEquals(Errors.NONE.code, response.errorCode) assertTrue(response.totalBytes > 0) assertTrue(response.usableBytes >= 0) assertFalse(response.topics().isEmpty) response.topics().forEach(t => assertFalse(t.partitions().isEmpty)) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850834#comment-17850834 ] Chia-Ping Tsai commented on KAFKA-15305: {quote} So seems we also need to make sure that the HB manager is actually polled when closing the consumer (the HBManager.pollOnClose you had suggested at some point) {quote} yep, I prefer to return HB of leaving group by `pollOnClose` and that is the description I written in KAFKA-16639. Also, I agree the solution of KAFKA-16639 PR as in the current flow the HB of leaving group can be generated. thread of closing consumer 1. prepareShutdown (https://github.com/apache/kafka/blob/32b2b73f673ecd41d17c03e99db3746c517990c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1268) 2. waiting for `LeaveOnCloseEvent` (https://github.com/apache/kafka/blob/32b2b73f673ecd41d17c03e99db3746c517990c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1279) 3. call ConsumerNetworkThread#close to stop the loop of ConsumerNetworkThread#run() (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L281) network thread (ConsumerNetworkThread) 1. processApplicationEvents (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L138). This method must be executed to handle `LeaveOnCloseEvent` 2. After `processApplicationEvents` (LeaveOnCloseEvent), `membershipManager.leaveGroup` is executed so ConsumerNetworkThread is aware of "leaving" 3. After `processApplicationEvents`, ConsumerNetworkThread will call HB manager poll sequentially (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L144) In short, ConsumerNetworkThread always call the HB manager poll after handling LeaveOnCloseEvent. {quote} With that, at this point we would actually generate the HB to leave, add then we would hit the logic you mentioned above with a request to send (not empty), here. Makes sense? {quote} Both ways (poll/pollOnClose) can resolve KAFKA-16639. I feel your comment (https://github.com/apache/kafka/pull/16017#discussion_r1612039771) is a simple solution, and so I did not add comment to say "we must to create HB request in pollOnClose) > The background thread should try to process the remaining task until the > shutdown timer is expired > -- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Chia-Ping Tsai >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. > > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: make public the consumer group migration policy config [kafka]
dajac merged PR #16128: URL: https://github.com/apache/kafka/pull/16128 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2140558404 I'm going to close and reopen to force another build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue closed pull request #16031: KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout URL: https://github.com/apache/kafka/pull/16031 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2140558063 @lianetm—relevant test failures have been addressed. There are three unrelated test failures from flaky tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621207749 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpecImpl.java: ## @@ -18,105 +18,63 @@ import org.apache.kafka.common.Uuid; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; /** - * The assignment specification for a consumer group member. + * Implementation of the {@link MemberSubscriptionSpec} interface. */ -public class AssignmentMemberSpec { -/** - * The instance ID if provided. - */ -private final Optional instanceId; - -/** - * The rack ID if provided. - */ +public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec { private final Optional rackId; - -/** - * Topics Ids that the member is subscribed to. - */ private final Set subscribedTopicIds; /** - * Partitions assigned keyed by topicId. - */ -private final Map> assignedPartitions; - -/** - * @return The instance ID as an Optional. + * Constructs a new {@code MemberSubscriptionSpecImpl}. + * + * @param rackIdThe rack Id. + * @param subscribedTopicIdsThe set of subscribed topic Ids. */ -public Optional instanceId() { -return instanceId; +public MemberSubscriptionSpecImpl( +Optional rackId, +Set subscribedTopicIds +) { +Objects.requireNonNull(rackId); +Objects.requireNonNull(subscribedTopicIds); +this.rackId = rackId; +this.subscribedTopicIds = subscribedTopicIds; Review Comment: I was just following the format I saw in TargetAssignmentResult and a few places, wasn't sure what to use -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621196989 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java: ## @@ -39,4 +41,20 @@ public interface GroupSpec { * False, otherwise. */ boolean isPartitionAssigned(Uuid topicId, int partitionId); + +/** + * Gets the member subscription specification for a member. + * + * @param memberId The member Id. + * @return The member's subscription metadata. + */ +MemberSubscriptionSpec memberSubscriptionSpec(String memberId); Review Comment: I actually wanted to asak about whether we want to return null or return an empty object. I returned a new memberSubscriptionSpecImpl object in the impl -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621186636 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java: ## @@ -39,4 +41,20 @@ public interface GroupSpec { * False, otherwise. */ boolean isPartitionAssigned(Uuid topicId, int partitionId); + +/** + * Gets the member subscription specification for a member. + * + * @param memberId The member Id. + * @return The member's subscription metadata. + */ +MemberSubscriptionSpec memberSubscriptionSpec(String memberId); Review Comment: thanks for the comment! I wanted to say that during our design discussions, we had agreed to keep it as memberSubscriptionSpec to maintain consistency and streamline the review process so we don't go back and forth. If there are new considerations or changes that we might not have anticipated, I would love to understand them better so we can make this more efficient going forward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621186636 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java: ## @@ -39,4 +41,20 @@ public interface GroupSpec { * False, otherwise. */ boolean isPartitionAssigned(Uuid topicId, int partitionId); + +/** + * Gets the member subscription specification for a member. + * + * @param memberId The member Id. + * @return The member's subscription metadata. + */ +MemberSubscriptionSpec memberSubscriptionSpec(String memberId); Review Comment: thanks for the comment! I wanted to say that during our design discussions, we had agreed to keep it as memberSubscriptionSpec to maintain consistency and streamline the review process so we don't go back and forth. If there are new considerations or changes that we might not have anticipated, I would love to understand them better so we can make this more efficient going forward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]
jolshan commented on code in PR #16130: URL: https://github.com/apache/kafka/pull/16130#discussion_r1621089819 ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -354,6 +354,7 @@ Found problem: MetadataVersion.LATEST_PRODUCTION, Map(TestFeatureVersion.FEATURE_NAME -> featureLevel), allFeatures, +false, Review Comment: `if (featureLevel <= Features.TEST_VERSION.defaultValue(MetadataVersion.LATEST_PRODUCTION))` this means we skip version 2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]
omkreddy merged PR #16046: URL: https://github.com/apache/kafka/pull/16046 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
yashmayya commented on PR #13375: URL: https://github.com/apache/kafka/pull/13375#issuecomment-2140247839 @mdedetrich apologies for the late response, I didn't get notified for your comment oddly enough. Please feel free to take over, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org