[jira] [Commented] (KAFKA-16616) refactor mergeWith in MetadataSnapshot
[ https://issues.apache.org/jira/browse/KAFKA-16616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846515#comment-17846515 ] Cao Manh Dat commented on KAFKA-16616: -- Hi [~alyssahuang] , can I work on this item? > refactor mergeWith in MetadataSnapshot > -- > > Key: KAFKA-16616 > URL: https://issues.apache.org/jira/browse/KAFKA-16616 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.7.0 >Reporter: Alyssa Huang >Priority: Minor > > Right now we keep track of topic ids and partition metadata to add/update > separately in mergeWith (e.g. two maps passed as arguments). This means we > iterate over topic metadata twice which could be costly when we're dealing > with a large number of updates. > `updatePartitionLeadership` which calls `mergeWith` does something similarly > (generates map of topic ids to update in a loop separate from the list of > partition metadata to update) and should be refactored as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14588 [3/N] ConfigCommandTest rewritten in java [kafka]
chia7712 commented on code in PR #15930: URL: https://github.com/apache/kafka/pull/15930#discussion_r1601032507 ## core/src/test/java/kafka/admin/ConfigCommandUnitTest.java: ## @@ -878,6 +886,486 @@ public void shouldNotDescribeUserScramCredentialsWithEntityDefaultUsingBootstrap verifyUserScramCredentialsNotDescribed(defaultUserOpt); } +@Test +public void shouldAddTopicConfigUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT, +"--entity-name", "my-topic", +"--entity-type", "topics", +"--alter", +"--add-config", "a=b,c=d")); + +KafkaZkClient zkClient = mock(KafkaZkClient.class); +when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new Properties()); + +ConfigCommand.alterConfigWithZk(null, createOpts, new AdminZkClient(zkClient, scala.None$.empty()) { +@Override +public void changeTopicConfig(String topic, Properties configChange) { +assertEquals("my-topic", topic); +assertEquals("b", configChange.get("a")); +assertEquals("d", configChange.get("c")); +} +}); +} + +@Test Review Comment: How about using `ValueSource`? ```java @Test @ValueSource(booleans = {true, false}) public void shouldAlterTopicConfig(boolean file) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1601000794 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported( } } +/** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ +private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { +if (!member.useClassicProtocol()) { +throw new UnknownMemberIdException( +String.format("Member %s does not use the classic protocol.", member.memberId()) +); +} +} + +/** + * Validates if the generation id and the protocol type from the request match those of the consumer group. + * + * @param group The ConsumerGroup. + * @param memberThe ConsumerGroupMember. + * @param requestGenerationId The generation id from the request. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ +private void throwIfGenerationIdOrProtocolUnmatched( Review Comment: nit: It may be better to split this one into two methods. One to validate the generation. Another one to validate the protocol type and name. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported( } } +/** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ +private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { +if (!member.useClassicProtocol()) { +throw new UnknownMemberIdException( +String.format("Member %s does not use the classic protocol.", member.memberId()) +); +} +} + +/** + * Validates if the generation id and the protocol type from the request match those of the consumer group. + * + * @param group The ConsumerGroup. + * @param memberThe ConsumerGroupMember. + * @param requestGenerationId The generation id from the request. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ +private void throwIfGenerationIdOrProtocolUnmatched( +ConsumerGroup group, +ConsumerGroupMember member, +int requestGenerationId, +String requestProtocolType, +String requestProtocolName +) { +if (member.memberEpoch() != requestGenerationId) { +throw Errors.ILLEGAL_GENERATION.exception( +String.format("The request generation id %s is not equal to the group epoch %d from the consumer group %s.", Review Comment: nit: `group epoch` -> `member epoch`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported( } } +/** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ +private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { +if (!member.useClassicProtocol()) { +throw new UnknownMemberIdException( +String.format("Member %s does not use the classic protocol.", member.memberId()) +); +} +} + +/** + * Validates if the generation id and the protocol type from the request match those of the consumer group. + * + * @param group The ConsumerGroup. + * @param memberThe ConsumerGroupMember. + * @param requestGenerationId The generation id from the request. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ +private void throwIfGenerationIdOrProtocolUnmatched( +ConsumerGroup group, +ConsumerGroupMember member, +int requestGenerationId, +String requestProtocolType, +String requestProtocolName +) { +if (member.memberEpoch() != requestGenerationId) { +throw Errors.ILLEGAL_GENERATION.exception( +String.format("The request generation id %s is not equal to the group epoch %d from the consumer group %s.", +requestGenerationId, group.groupEpoch(), group.groupId()) Review Comment: nit: `group.groupEpoch()` -> `member.memberEpoch()`. ## group-coordina
[jira] [Commented] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error
[ https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846508#comment-17846508 ] Luke Chen commented on KAFKA-16760: --- [~soarez], I'm really sorry, I can't believe I didn't commit my change upto the branch yesterday. I just updated the branch, and it reliably failed in my env after 3 run. Please give it a try again. I'd like to know if this is the expected behavior, or it is a bug? > alterReplicaLogDirs failed even if responded with none error > > > Key: KAFKA-16760 > URL: https://issues.apache.org/jira/browse/KAFKA-16760 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Major > > When firing alterLogDirRequest, it gets error NONE result. But actually, the > alterLogDir never happened with these errors: > {code:java} > [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 has an older epoch (0) than the current leader. Will await the new > LeaderAndIsr state before resuming fetching. > (kafka.server.ReplicaAlterLogDirsThread:66) > [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70) > {code} > Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. > This can be reproduced in this > [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running > this test: > {code:java} > ./gradlew cleanTest storage:test --tests > org.apache.kafka.tiered.storage.integration.AlterLogDirTest > {code} > The complete logs can be found here: > https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
chia7712 commented on code in PR #15946: URL: https://github.com/apache/kafka/pull/15946#discussion_r1600992789 ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -66,96 +65,66 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte private final String baseDisplayName; private final ClusterConfig clusterConfig; -private final AtomicReference clusterReference; -private final AtomicReference zkReference; private final boolean isCombined; public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig clusterConfig, boolean isCombined) { this.baseDisplayName = baseDisplayName; this.clusterConfig = clusterConfig; -this.clusterReference = new AtomicReference<>(); -this.zkReference = new AtomicReference<>(); this.isCombined = isCombined; } @Override public String getDisplayName(int invocationIndex) { String clusterDesc = clusterConfig.nameTags().entrySet().stream() -.map(Object::toString) -.collect(Collectors.joining(", ")); +.map(Object::toString) Review Comment: please avoid those unrelated changes. smaller is better ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -252,7 +220,13 @@ public Admin createAdminClient(Properties configOverrides) { public void start() { Review Comment: in this method we should always call `format` first. That is a big sugar to users ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -171,39 +140,39 @@ public Optional controllerListenerName() { @Override public Collection controllerSocketServers() { return controllers() -.map(ControllerServer::socketServer) -.collect(Collectors.toList()); +.map(ControllerServer::socketServer) +.collect(Collectors.toList()); } @Override public SocketServer anyBrokerSocketServer() { return brokers() -.map(BrokerServer::socketServer) -.findFirst() -.orElseThrow(() -> new RuntimeException("No broker SocketServers found")); +.map(BrokerServer::socketServer) Review Comment: ditto. please revert those changes. ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -284,24 +258,51 @@ public void startBroker(int brokerId) { @Override public void waitForReadyBrokers() throws InterruptedException { try { -clusterReference.get().waitForReadyBrokers(); +clusterTestKit.waitForReadyBrokers(); } catch (ExecutionException e) { throw new AssertionError("Failed while waiting for brokers to become ready", e); } } -private BrokerServer findBrokerOrThrow(int brokerId) { -return Optional.ofNullable(clusterReference.get().brokers().get(brokerId)) -.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); -} - public Stream brokers() { -return clusterReference.get().brokers().values().stream(); +return clusterTestKit.brokers().values().stream(); } public Stream controllers() { -return clusterReference.get().controllers().values().stream(); +return clusterTestKit.controllers().values().stream(); } +public void format() throws Exception { Review Comment: `format` and `buildAndFormatCluster` can be merged. for example: ```java public void format() { if (this.clusterTestKit == null) { try { KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder() .setBootstrapMetadataVersion(clusterConfig.metadataVersion()) .setCombined(isCombined) .setNumBrokerNodes(clusterConfig.numBrokers()) .setNumDisksPerBroker(clusterConfig.numDisksPerBroker()) .setPerServerProperties(clusterConfig.perServerOverrideProperties()) .setNumControllerNodes(clusterConfig.numControllers()).build()); if (Boolean.parseBoolean(clusterConfig.serverProperties() .getOrDefault("zookeeper.metadata.migration.enable", "false"))) { this.embeddedZookeeper = new EmbeddedZookeeper(); builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", embeddedZookeeper.port())); } // Copy properties into the Test
Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]
chia7712 commented on code in PR #15933: URL: https://github.com/apache/kafka/pull/15933#discussion_r1600990933 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java: ## @@ -960,6 +666,14 @@ private void expectConvertWriteRead(final String configKey, final Schema valueSc }); } +private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized, Review Comment: please remove this unused function -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix flaky testRecordThreadIdleRatioTwoThreads test [kafka]
chia7712 commented on PR #15937: URL: https://github.com/apache/kafka/pull/15937#issuecomment-2111647759 @jeffkbkim It seems the test case is still a bit flaky. see https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15933/2/testReport/org.apache.kafka.coordinator.group.runtime/MultiThreadedEventProcessorTest/Build___JDK_8_and_Scala_2_12___testRecordThreadIdleRatio__/ Do you have free time to dig in it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rename `Record` to `CoordinatorRecord` [kafka]
chia7712 merged PR #15949: URL: https://github.com/apache/kafka/pull/15949 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16763: Upgrade to scala 2.12.19 and scala 2.13.14 [kafka]
chia7712 commented on PR #15958: URL: https://github.com/apache/kafka/pull/15958#issuecomment-2111632709 ``` [2024-05-15T01:51:49.255Z] > Task :core:compileScala [2024-05-15T01:51:49.255Z] [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-15958/core/src/main/scala/kafka/controller/KafkaController.scala:1202:18: method setOrCreatePartitionReassignment in class KafkaZkClient is deprecated [2024-05-15T01:51:50.590Z] [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-15958/core/src/main/scala/kafka/zk/ZkData.scala:518:14: class LegacyPartitionAssignment in object ReassignPartitionsZNode is deprecated [2024-05-15T01:51:50.590Z] [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-15958/core/src/main/scala/kafka/zk/ZkData.scala:524:24: class LegacyPartitionAssignment in object ReassignPartitionsZNode is deprecated [2024-05-15T01:51:50.590Z] [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-15958/core/src/main/scala/kafka/zk/ZkData.scala:533:23: class LegacyPartitionAssignment in object ReassignPartitionsZNode is deprecated [2024-05-15T01:51:50.590Z] [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-15958/core/src/main/scala/kafka/zk/ZkData.scala:533:49: class LegacyPartitionAssignment in object ReassignPartitionsZNode is deprecated ``` please file the build error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE… [kafka]
chia7712 commented on PR #15917: URL: https://github.com/apache/kafka/pull/15917#issuecomment-2111631318 @showuon Please take a look if you have free time. I'd like to migrate all tests of storage to new test infra after this PR gets merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16686: Wait for given offset in TopicBasedRemoteLogMetadataManagerTest [kafka]
chia7712 merged PR #15885: URL: https://github.com/apache/kafka/pull/15885 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix TargetAssignmentBuilderBenchmark [kafka]
chia7712 merged PR #15950: URL: https://github.com/apache/kafka/pull/15950 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix warnings in streams javadoc [kafka]
chia7712 merged PR #15955: URL: https://github.com/apache/kafka/pull/15955 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14
[ https://issues.apache.org/jira/browse/KAFKA-16763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16763: -- Assignee: 黃竣陽 (was: Chia-Ping Tsai) > Upgrade to scala 2.12.19 and scala 2.13.14 > -- > > Key: KAFKA-16763 > URL: https://issues.apache.org/jira/browse/KAFKA-16763 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: 黃竣陽 >Priority: Minor > > scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19) > > scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16671) Revisit SessionedProtocolIntegrationTest.ensureInternalEndpointIsSecured
[ https://issues.apache.org/jira/browse/KAFKA-16671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16671. Fix Version/s: 3.8.0 Resolution: Fixed > Revisit SessionedProtocolIntegrationTest.ensureInternalEndpointIsSecured > > > Key: KAFKA-16671 > URL: https://issues.apache.org/jira/browse/KAFKA-16671 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > Fix For: 3.8.0 > > > loop 1000times on my local, and all pass. Let's enable the test to see what > happens in our CI -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16671: enable test for ensureInternalEndpointIsSecured [kafka]
chia7712 merged PR #15868: URL: https://github.com/apache/kafka/pull/15868 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]
appchemist opened a new pull request, #15961: URL: https://github.com/apache/kafka/pull/15961 - If any non-retriable exceptions were encountered during metadata update, clear and throw the exception in new consumer.poll. (like legacy consumer) - If an invalid topic is discovered in metadata in new consumer.poll, throw InvalidTopicException - Enable `KafkaConsumerTest.testSubscriptionOnInvalidTopic` test ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
[ https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846483#comment-17846483 ] appchemist commented on KAFKA-16764: Hi [~lianetm] I would like to take this issue > New consumer should throw InvalidTopicException on poll when invalid topic in > metadata > -- > > Key: KAFKA-16764 > URL: https://issues.apache.org/jira/browse/KAFKA-16764 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > A call to consumer.poll should throw InvalidTopicException if an invalid > topic is discovered in metadata. This can be easily reproduced by calling > subscribe("invalid topic") and then poll, for example.The new consumer does > not throw the expected InvalidTopicException like the LegacyKafkaConsumer > does. > The legacy consumer achieves this by checking for metadata exceptions on > every iteration of the ConsumerNetworkClient (see > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > This is probably what makes that > [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] > fails for the new consumer. Once this bug is fixed, we should be able to > enable that test for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15050: format the prompts in the quickstart [kafka]
joobisb commented on PR #13862: URL: https://github.com/apache/kafka/pull/13862#issuecomment-2111521992 @tombentley @flavray I have addressed all the comments, can we merge this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update leaderAndEpoch before initializing metadata publishers [kafka]
github-actions[bot] commented on PR #15366: URL: https://github.com/apache/kafka/pull/15366#issuecomment-2111516055 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16763: Upgrade to scala 2.12.19 and scala 2.13.14 [kafka]
m1a2st commented on PR #15958: URL: https://github.com/apache/kafka/pull/15958#issuecomment-2111504233 @chia7712, This PR please take a look, Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [DO NOT MERGE] KIP-924 rack info API option 1: add #taskIdToPartitionsRackIds and PartitionRackIds [kafka]
ableegoldman opened a new pull request, #15960: URL: https://github.com/apache/kafka/pull/15960 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [DO NOT MERGE] KIP-924 rack info API option 2: introduce TaskInfo with complete task metadata [kafka]
ableegoldman opened a new pull request, #15959: URL: https://github.com/apache/kafka/pull/15959 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16765) NioEchoServer leaks accepted SocketChannel instances due to race condition
[ https://issues.apache.org/jira/browse/KAFKA-16765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846475#comment-17846475 ] Greg Harris commented on KAFKA-16765: - This is also a bug in EchoServer: [https://github.com/apache/kafka/blob/cb968845ecb3cb0982182d9dd437ecf652fe38d3/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java#L76-L81] and ServerShutdownTest: [https://github.com/apache/kafka/blob/cb968845ecb3cb0982182d9dd437ecf652fe38d3/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala#L274-L275] except those don't require a race condition to happen. > NioEchoServer leaks accepted SocketChannel instances due to race condition > -- > > Key: KAFKA-16765 > URL: https://issues.apache.org/jira/browse/KAFKA-16765 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 3.8.0 >Reporter: Greg Harris >Priority: Minor > > The NioEchoServer has an AcceptorThread that calls accept() to open new > SocketChannel instances and insert them into the `newChannels` List, and a > main thread that drains the `newChannels` List and moves them to the > `socketChannels` List. > During shutdown, the serverSocketChannel is closed, which causes both threads > to exit their while loops. It is possible for the NioEchoServer main thread > to sense the serverSocketChannel close and terminate before the Acceptor > thread does, and for the Acceptor thread to put a SocketChannel in > `newChannels` before terminating. This instance is never closed by either > thread, because it is never moved to `socketChannels`. > A precise execution order that has this leak is: > 1. NioEchoServer thread locks `newChannels`. > 2. Acceptor thread accept() completes, and the SocketChannel is created > 3. Acceptor thread blocks waiting for the `newChannels` lock > 4. NioEchoServer thread releases the `newChannels` lock and does some > processing > 5. NioEchoServer#close() is called, which closes the serverSocketChannel > 6. NioEchoServer thread checks serverSocketChannel.isOpen() and then > terminates > 7. Acceptor thread acquires the `newChannels` lock and adds the SocketChannel > to `newChannels`. > 8. Acceptor thread checks serverSocketChannel.isOpen() and then terminates. > 9. NioEchoServer#close() stops blocking now that both other threads have > terminated. > The end result is that the leaked socket is left open in the `newChannels` > list at the end of close(), which is incorrect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16768) SocketServer leaks accepted SocketChannel instances due to race condition
[ https://issues.apache.org/jira/browse/KAFKA-16768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16768: Description: The SocketServer has threads for Acceptors and Processors. These threads communicate via Processor#accept/Processor#configureNewConnections and the `newConnections` queue. During shutdown, the Acceptor and Processors are each stopped by setting shouldRun to false, and then shutdown proceeds asynchronously in all instances together. This leads to a race condition where an Acceptor accepts a SocketChannel and queues it to a Processor, but that Processor instance has already started shutting down and has already drained the newConnections queue. KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely different implementation but has the same flaw. An example execution order that includes this leak: 1. Acceptor#accept() is called, and a new SocketChannel is accepted. 2. Acceptor#assignNewConnection() begins 3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor and attached Processor instances 4. Processor#run() checks the shouldRun variable, and exits the loop 5. Processor#closeAll() executes, and drains the `newConnections` variable 6. Processor#run() returns and the Processor thread terminates 7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the SocketChannel to `newConnections` 8. Acceptor#assignNewConnection() returns 9. Acceptor#run() checks the shouldRun variable and exits the loop, and the Acceptor thread terminates. 10. Acceptor#close() joins all of the terminated threads, and returns At the end of this sequence, there are still open SocketChannel instances in newConnections, which are then considered leaked. was: The SocketServer has threads for Acceptors and Processors. These threads communicate via Processor#accept/Processor#configureNewConnections and the `newConnections` queue. During shutdown, the Acceptor and Processors are each stopped by setting shouldRun to false, and then shutdown proceeds asynchronously in all instances together. This leads to a race condition where an Acceptor accepts a SocketChannel and queues it to a Processor, but that Processor instance has already started shutting down and has already drained the newConnections queue. > SocketServer leaks accepted SocketChannel instances due to race condition > - > > Key: KAFKA-16768 > URL: https://issues.apache.org/jira/browse/KAFKA-16768 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.8.0 >Reporter: Greg Harris >Priority: Major > > The SocketServer has threads for Acceptors and Processors. These threads > communicate via Processor#accept/Processor#configureNewConnections and the > `newConnections` queue. > During shutdown, the Acceptor and Processors are each stopped by setting > shouldRun to false, and then shutdown proceeds asynchronously in all > instances together. This leads to a race condition where an Acceptor accepts > a SocketChannel and queues it to a Processor, but that Processor instance has > already started shutting down and has already drained the newConnections > queue. > KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely > different implementation but has the same flaw. > An example execution order that includes this leak: > 1. Acceptor#accept() is called, and a new SocketChannel is accepted. > 2. Acceptor#assignNewConnection() begins > 3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor > and attached Processor instances > 4. Processor#run() checks the shouldRun variable, and exits the loop > 5. Processor#closeAll() executes, and drains the `newConnections` variable > 6. Processor#run() returns and the Processor thread terminates > 7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the > SocketChannel to `newConnections` > 8. Acceptor#assignNewConnection() returns > 9. Acceptor#run() checks the shouldRun variable and exits the loop, and the > Acceptor thread terminates. > 10. Acceptor#close() joins all of the terminated threads, and returns > At the end of this sequence, there are still open SocketChannel instances in > newConnections, which are then considered leaked. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16768) SocketServer leaks accepted SocketChannel instances due to race condition
Greg Harris created KAFKA-16768: --- Summary: SocketServer leaks accepted SocketChannel instances due to race condition Key: KAFKA-16768 URL: https://issues.apache.org/jira/browse/KAFKA-16768 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.8.0 Reporter: Greg Harris The SocketServer has threads for Acceptors and Processors. These threads communicate via Processor#accept/Processor#configureNewConnections and the `newConnections` queue. During shutdown, the Acceptor and Processors are each stopped by setting shouldRun to false, and then shutdown proceeds asynchronously in all instances together. This leads to a race condition where an Acceptor accepts a SocketChannel and queues it to a Processor, but that Processor instance has already started shutting down and has already drained the newConnections queue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14
[ https://issues.apache.org/jira/browse/KAFKA-16763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846471#comment-17846471 ] 黃竣陽 commented on KAFKA-16763: - I will handle this issue. > Upgrade to scala 2.12.19 and scala 2.13.14 > -- > > Key: KAFKA-16763 > URL: https://issues.apache.org/jira/browse/KAFKA-16763 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19) > > scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15045: (KIP-924 pt. 3) Implement KafkaStreamsAssignment [kafka]
ableegoldman commented on PR #15944: URL: https://github.com/apache/kafka/pull/15944#issuecomment-2111377214 Merged to trunk. I'll update the KIP and send out a notice about the change from interface to class and the new APIs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 3) Implement KafkaStreamsAssignment [kafka]
ableegoldman merged PR #15944: URL: https://github.com/apache/kafka/pull/15944 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 3) Implement KafkaStreamsAssignment [kafka]
ableegoldman commented on PR #15944: URL: https://github.com/apache/kafka/pull/15944#issuecomment-2111376581 The test env seems pretty unstable right now but we have at least one clean build for each java version if you look at the two latest runs. All test failures are unrelated as well. Seems safe to merge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState [kafka]
ableegoldman commented on PR #15920: URL: https://github.com/apache/kafka/pull/15920#issuecomment-2111374003 Merged to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState [kafka]
ableegoldman commented on PR #15920: URL: https://github.com/apache/kafka/pull/15920#issuecomment-2111373393 The test env seems pretty unstable right now but we have at least one clean build for each java version if you look at the two latest runs. All test failures are unrelated as well. Seems safe to merge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState [kafka]
ableegoldman merged PR #15920: URL: https://github.com/apache/kafka/pull/15920 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16361) Rack aware sticky assignor minQuota violations
[ https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846467#comment-17846467 ] A. Sophie Blee-Goldman edited comment on KAFKA-16361 at 5/15/24 12:24 AM: -- Thanks, I think it's safe to say this is related to the rack-aware assignment code that was added in 3.5. Probably the same issue that [~flashmouse] found in KAFKA-15170 Fortunately I just merged that fix and cherrypicked it back to 3.7, so the patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is just a day from KIP freeze which means if all goes well, it will be available in a little over a month. If you need an immediate resolution in the meantime then you have two options: 1) disable rack-awareness which will effectively make the assignor just skip over the buggy code 2) if you can build from source and don't require an official release, just cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch with whatever version you'd like to use and compile it yourself. I wouldn't recommend building directly from trunk for a production environment since that contains untested code, but you can at least run your test again using the latest trunk build if you want to make sure that it fixes the issue you're experiencing. I'm pretty confident it will though was (Author: ableegoldman): Thanks, I think it's safe to say this is related to the rack-aware assignment code that was added in 3.5. Probably the same issue that [~flashmouse] found in [KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170] Fortunately I just merged that fix and cherrypicked it back to 3.7, so the patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is just a day from KIP freeze which means if all goes well, it will be available in a little over a month. If you need an immediate resolution in the meantime then you have two options: 1) disable rack-awareness which will effectively make the assignor just skip over the buggy code 2) if you can build from source and don't require an official release, just cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch with whatever version you'd like to use and compile it yourself. I wouldn't recommend building directly from trunk for a production environment since that contains untested code, but you can at least run your test again using the latest trunk build if you want to make sure that it fixes the issue you're experiencing. I'm pretty confident it will though > Rack aware sticky assignor minQuota violations > -- > > Key: KAFKA-16361 > URL: https://issues.apache.org/jira/browse/KAFKA-16361 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.1, 3.7.0, 3.6.1 >Reporter: Luke D >Priority: Major > Attachments: illegalstateexception.log > > > In some low topic replication scenarios the rack aware assignment in the > StickyAssignor fails to balance consumers to its own expectations and throws > an IllegalStateException, commonly crashing the application (depending on > application implementation). While uncommon the error is deterministic, and > so persists until the replication state changes. > > We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it > locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely > would also be reproducible there) > > Here is the error and stack from our test case against 3.7.0 > {code:java} > We haven't reached the expected number of members with more than the minQuota > partitions, but no more partitions to be assigned > java.lang.IllegalStateException: We haven't reached the expected number of > members with more than the minQuota partitions, but no more partitions to be > assigned > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) > {code} > Here is a specific test case from 3.7.0 that fails when passed to > StickyAssignor.assign: > {code:java} > Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 > (id: 3 rack: rack
[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations
[ https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846467#comment-17846467 ] A. Sophie Blee-Goldman commented on KAFKA-16361: Thanks, I think it's safe to say this is related to the rack-aware assignment code that was added in 3.5. Probably the same issue that [~flashmouse] found in [KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170] Fortunately I just merged that fix and cherrypicked it back to 3.7, so the patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is just a day from KIP freeze which means if all goes well, it will be available in a little over a month. If you need an immediate resolution in the meantime then you have two options: 1) disable rack-awareness which will effectively make the assignor just skip over the buggy code 2) if you can build from source and don't require an official release, just cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch with whatever version you'd like to use and compile it yourself. I wouldn't recommend building directly from trunk for a production environment since that contains untested code, but you can at least run your test again using the latest trunk build if you want to make sure that it fixes the issue you're experiencing. I'm pretty confident it will though > Rack aware sticky assignor minQuota violations > -- > > Key: KAFKA-16361 > URL: https://issues.apache.org/jira/browse/KAFKA-16361 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.1, 3.7.0, 3.6.1 >Reporter: Luke D >Priority: Major > Attachments: illegalstateexception.log > > > In some low topic replication scenarios the rack aware assignment in the > StickyAssignor fails to balance consumers to its own expectations and throws > an IllegalStateException, commonly crashing the application (depending on > application implementation). While uncommon the error is deterministic, and > so persists until the replication state changes. > > We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it > locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely > would also be reproducible there) > > Here is the error and stack from our test case against 3.7.0 > {code:java} > We haven't reached the expected number of members with more than the minQuota > partitions, but no more partitions to be assigned > java.lang.IllegalStateException: We haven't reached the expected number of > members with more than the minQuota partitions, but no more partitions to be > assigned > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) > {code} > Here is a specific test case from 3.7.0 that fails when passed to > StickyAssignor.assign: > {code:java} > Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 > (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: > rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader > = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = > topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = > 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = > topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], > offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = > 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = > topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader =
[jira] [Updated] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly
[ https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-15170: --- Fix Version/s: 3.8.0 3.7.1 > CooperativeStickyAssignor cannot adjust assignment correctly > > > Key: KAFKA-15170 > URL: https://issues.apache.org/jira/browse/KAFKA-15170 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0 >Reporter: li xiangyuan >Assignee: li xiangyuan >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment > when all consumers in group subscribe the same topic list, but it couldn't > add all partitions move owner to another consumer to > ``partitionsWithMultiplePreviousOwners``. > > the reason is in function assignOwnedPartitions hasn't add partitions that > rack-mismatch with prev owner to allRevokedPartitions, then partition only in > this list would add to partitionsWithMultiplePreviousOwners. > > In Cooperative Rebalance, partitions have changed owner must be removed from > final assignment or will lead to incorrect consume behavior, I have already > raise a pr, please take a look, thx > > [https://github.com/apache/kafka/pull/13965] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly
[ https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-15170. Resolution: Fixed > CooperativeStickyAssignor cannot adjust assignment correctly > > > Key: KAFKA-15170 > URL: https://issues.apache.org/jira/browse/KAFKA-15170 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0 >Reporter: li xiangyuan >Assignee: li xiangyuan >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment > when all consumers in group subscribe the same topic list, but it couldn't > add all partitions move owner to another consumer to > ``partitionsWithMultiplePreviousOwners``. > > the reason is in function assignOwnedPartitions hasn't add partitions that > rack-mismatch with prev owner to allRevokedPartitions, then partition only in > this list would add to partitionsWithMultiplePreviousOwners. > > In Cooperative Rebalance, partitions have changed owner must be removed from > final assignment or will lead to incorrect consume behavior, I have already > raise a pr, please take a look, thx > > [https://github.com/apache/kafka/pull/13965] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15170: CooperativeStickyAssignor may fail adjust assignment [kafka]
ableegoldman commented on PR #13965: URL: https://github.com/apache/kafka/pull/13965#issuecomment-2111359637 Merged to trunk and cherrypicked back to 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16277: AbstractStickyAssignor - Sort owned TopicPartitions by partition when reassigning [kafka]
ableegoldman commented on PR #15416: URL: https://github.com/apache/kafka/pull/15416#issuecomment-2111358899 FYI I cherrypicked this back to 3.7 while cherrypicking back another sticky assignor fix. Should be in the 3.7.1 release -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16277: --- Fix Version/s: 3.7.1 > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Assignee: Cameron Redpath >Priority: Major > Fix For: 3.8.0, 3.7.1 > > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two consumers, 6 partitions from each topic go to each consumer > * With three consumers, 4 partitions from each topic go to each consumer > * With four consumers, 3 partitions from each topic go to each consumer > > Of note, we only saw this behaviour after migrating to the > `CooperativeStickyAssignor`. It was not an issue with the default partition > assignment strategy. > > It is possible this may be intended behaviour. In which case, what is the > preferred workaround for our scenario? Our current workaround if we decide to > go ahead with the update to `CooperativeStickyAssignor` may be to limit our > consumers so they only subscribe to one topic, and have two consumer threads > per instance of the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15170: CooperativeStickyAssignor may fail adjust assignment [kafka]
ableegoldman merged PR #13965: URL: https://github.com/apache/kafka/pull/13965 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15170: CooperativeStickyAssignor may fail adjust assignment [kafka]
ableegoldman commented on PR #13965: URL: https://github.com/apache/kafka/pull/13965#issuecomment-2111341726 Test failures are unrelated, merging to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1600781735 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## Review Comment: I added a test to `AsyncKafkaConsumerTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1600781636 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## Review Comment: Oh, the code definitely has smells! 😉 I added a test to `ConsumerNetworkThread`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] improve log description of QuorumController [kafka]
chickenchickenlove commented on code in PR #15926: URL: https://github.com/apache/kafka/pull/15926#discussion_r160061 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -165,6 +165,8 @@ static ControllerResult recordsForNonEmptyLog( throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was " + "created in KRaft mode."); } +logMessageBuilder +.append("since this is a de-novo KRaft cluster."); Review Comment: This is better 👍 I make a new commit to apply your comments. Please take another look, when you have free time 🙇♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]
gaurav-narula commented on code in PR #15945: URL: https://github.com/apache/kafka/pull/15945#discussion_r1600768789 ## metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java: ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.loader.LogDeltaManifest; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.MetadataVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Timeout(value = 40) +public class BrokerRegistrationTrackerTest { +static final Uuid INCARNATION_ID = Uuid.fromString("jyjLbk31Tpa53pFrU9Y-Ng"); + +static final Uuid A = Uuid.fromString("Ahw3vXfnThqeZbb7HD1w6Q"); + +static final Uuid B = Uuid.fromString("BjOacT0OTNqIvUWIlKhahg"); + +static final Uuid C = Uuid.fromString("CVHi_iv2Rvy5_1rtPdasfg"); + +static class BrokerRegistrationTrackerTestContext { +AtomicInteger numCalls = new AtomicInteger(0); +BrokerRegistrationTracker tracker = new BrokerRegistrationTracker(1, +Arrays.asList(B, A), () -> numCalls.incrementAndGet()); + +MetadataImage image = MetadataImage.EMPTY; + +void onMetadataUpdate(MetadataDelta delta) { +MetadataProvenance provenance = new MetadataProvenance(0, 0, 0); +image = delta.apply(provenance); +LogDeltaManifest manifest = new LogDeltaManifest.Builder(). +provenance(provenance). +leaderAndEpoch(LeaderAndEpoch.UNKNOWN). +numBatches(1). +elapsedNs(1). +numBytes(1). +build(); +tracker.onMetadataUpdate(delta, image, manifest); +} + +MetadataDelta newDelta() { +return new MetadataDelta.Builder(). +setImage(image). +build(); +} +} + +@Test +public void testTrackerName() { +BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext(); +assertEquals("BrokerRegistrationTracker(id=1)", ctx.tracker.name()); +} + +@Test +public void testMetadataVersionUpdateWithoutRegistrationDoesNothing() { +BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext(); +MetadataDelta delta = ctx.newDelta(); +delta.replay(new FeatureLevelRecord(). +setName(MetadataVersion.FEATURE_NAME). +setFeatureLevel(MetadataVersion.IBP_3_7_IV2.featureLevel())); +ctx.onMetadataUpdate(delta); +assertEquals(0, ctx.numCalls.get()); +} + +@Test +public void testBrokerUpdateWithoutNewMvDoesNothing() { +BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext(); +MetadataDelta delta = ctx.newDelta(); +delta.replay(new RegisterBrokerRecord(). +setBrokerId(1). +setIncarnationId(INCARNATION_ID). +setLogDirs(Arrays.asList(A, B, C))); +ctx.onMetadataUpdate(delta); +assertEquals(0, ctx.numCalls.get()); +} + +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testBrokerUpdateWithNewMv(boolean jbodMv) { +BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext(); +MetadataDelta delta = ctx.newDelta(); +delta.replay(new RegisterBrokerRecord(). +setBrokerId(1). +setIncarnationId(INCARNATION_ID). +setLogDirs(Arrays.asList())); +delta.replay(new Featur
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
junrao commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1600483273 ## clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java: ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class GzipCompressionTest { + +@Test +public void testCompressionDecompression() throws IOException { +GzipCompression.Builder builder = Compression.gzip(); +byte[] data = "data".getBytes(StandardCharsets.UTF_8); Review Comment: Should we test sth bigger like more than 512 bytes so that it covers the out.flush() logic? ## clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java: ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class GzipCompressionTest { + +@Test +public void testCompressionDecompression() throws IOException { +GzipCompression.Builder builder = Compression.gzip(); +byte[] data = "data".getBytes(StandardCharsets.UTF_8); + +for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { +for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) { +GzipCompression compression = builder.level(level).build(); +ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4); +try (OutputStream out = compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(data); +out.flush(); +} +bufferStream.buffer().flip(); + +try (InputStream inputStream = compression.wrapForInput(bufferStream.buffer(), magic, BufferSupplier.create())) { +byte[] result = new byte[data.length]; +int read = inputStream.read(result); +assertEquals(data.length, read); +assertArrayEquals(data, result); +} +} +} +
[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
[ https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16764: -- Fix Version/s: 3.8.0 > New consumer should throw InvalidTopicException on poll when invalid topic in > metadata > -- > > Key: KAFKA-16764 > URL: https://issues.apache.org/jira/browse/KAFKA-16764 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > A call to consumer.poll should throw InvalidTopicException if an invalid > topic is discovered in metadata. This can be easily reproduced by calling > subscribe("invalid topic") and then poll, for example.The new consumer does > not throw the expected InvalidTopicException like the LegacyKafkaConsumer > does. > The legacy consumer achieves this by checking for metadata exceptions on > every iteration of the ConsumerNetworkClient (see > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > This is probably what makes that > [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] > fails for the new consumer. Once this bug is fixed, we should be able to > enable that test for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16767) KRaft should track HWM outside the log layer
José Armando García Sancio created KAFKA-16767: -- Summary: KRaft should track HWM outside the log layer Key: KAFKA-16767 URL: https://issues.apache.org/jira/browse/KAFKA-16767 Project: Kafka Issue Type: Improvement Components: kraft Reporter: José Armando García Sancio The current implementation of KRaft tracks the HWM using the log layer implementation. The log layer has an invariant where the HWM <= LEO. This mean that the log layer always sets the HWM to the minimum of HWM and LEO. This has the side-effect of the local KRaft reporting a HWM that is much smaller than the leader's HWM when the replica start with an empty log. E.g. a new broker or the kafka-metadata-shell. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16637) AsyncKafkaConsumer removes offset fetch responses from cache too aggressively
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846446#comment-17846446 ] Lianet Magrans commented on KAFKA-16637: Hey [~chickenchickenlove], sorry I had missed your last question. The new group rebalance protocol from KIP-848 is supported in KRaft mode only. > AsyncKafkaConsumer removes offset fetch responses from cache too aggressively > - > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message
[ https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16766: --- Description: If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer will throw a org.apache.kafka.common.errors.TimeoutException as expected, but with the following as message: "java.util.concurrent.TimeoutException". We should provide a clearer message, and I would even say we keep the same message that the LegacyConsumer shows in this case, ex: "Failed to get offsets by times in 6ms". To fix this we should consider catching the timeout exception in the consumer when offsetsForTimes result times out ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), and propagate it with the message specific to offsetsForTimes. Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the same timeout message in the LegacyConsumer (defined [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]), but do not have a clear message in the Async, so we should fix them all 3. With the fix, we should write tests for each func, like the ones defined for the Legacy Consumer ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]). Note that we would need different tests, added to AsyncKafkaConsumerTest, given that the async consumer issues a FindCoordinator request in this case, but the AsyncConsumer does, so it does not account for that when matching requests/responses in the current tests. was: If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer will throw a org.apache.kafka.common.errors.TimeoutException as expected, but with the following as message: "java.util.concurrent.TimeoutException". We should provide a clearer message, and I would even say we keep the same message that the LegacyConsumer shows in this case, ex: "Failed to get offsets by times in 6ms". To fix this we should consider catching the timeout exception in the consumer when offsetsForTimes result times out ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), and propagate it with the message specific to offsetsForTimes. After the fix, we should be able to write a test like the [testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246] that exist for the legacy consumer. Note that we would need a different test given that the legacy consumer does not issue a FindCoordinator request in this case but the AsyncConsumer does, so the test would have to account for that when matching requests/responses. > New consumer offsetsForTimes timeout exception does not have the proper > message > --- > > Key: KAFKA-16766 > URL: https://issues.apache.org/jira/browse/KAFKA-16766 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer > will throw a org.apache.kafka.common.errors.TimeoutException as expected, but > with the following as message: "java.util.concurrent.TimeoutException". > We should provide a clearer message, and I would even say we keep the same > message that the LegacyConsumer shows in this case, ex: "Failed to get > offsets by times in 6ms". > To fix this we should consider catching the timeout exception in the consumer > when offsetsForTimes result times out > ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), > and propagate it with the message specific to offsetsForTimes. > Same situation exists for beginningOffsets and endOffsets. All 3 funcs show > the same timeout message in the LegacyConsumer (defined > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]), > but do not have a clear message in the Async, so we should fix them all 3. > With the fix, we should write
[jira] [Created] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message
Lianet Magrans created KAFKA-16766: -- Summary: New consumer offsetsForTimes timeout exception does not have the proper message Key: KAFKA-16766 URL: https://issues.apache.org/jira/browse/KAFKA-16766 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans Fix For: 3.8.0 If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer will throw a org.apache.kafka.common.errors.TimeoutException as expected, but with the following as message: "java.util.concurrent.TimeoutException". We should provide a clearer message, and I would even say we keep the same message that the LegacyConsumer shows in this case, ex: "Failed to get offsets by times in 6ms". To fix this we should consider catching the timeout exception in the consumer when offsetsForTimes result times out ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), and propagate it with the message specific to offsetsForTimes. After the fix, we should be able to write a test like the [testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246] that exist for the legacy consumer. Note that we would need a different test given that the legacy consumer does not issue a FindCoordinator request in this case but the AsyncConsumer does, so the test would have to account for that when matching requests/responses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16765) NioEchoServer leaks accepted SocketChannel instances due to race condition
Greg Harris created KAFKA-16765: --- Summary: NioEchoServer leaks accepted SocketChannel instances due to race condition Key: KAFKA-16765 URL: https://issues.apache.org/jira/browse/KAFKA-16765 Project: Kafka Issue Type: Bug Components: core, unit tests Affects Versions: 3.8.0 Reporter: Greg Harris The NioEchoServer has an AcceptorThread that calls accept() to open new SocketChannel instances and insert them into the `newChannels` List, and a main thread that drains the `newChannels` List and moves them to the `socketChannels` List. During shutdown, the serverSocketChannel is closed, which causes both threads to exit their while loops. It is possible for the NioEchoServer main thread to sense the serverSocketChannel close and terminate before the Acceptor thread does, and for the Acceptor thread to put a SocketChannel in `newChannels` before terminating. This instance is never closed by either thread, because it is never moved to `socketChannels`. A precise execution order that has this leak is: 1. NioEchoServer thread locks `newChannels`. 2. Acceptor thread accept() completes, and the SocketChannel is created 3. Acceptor thread blocks waiting for the `newChannels` lock 4. NioEchoServer thread releases the `newChannels` lock and does some processing 5. NioEchoServer#close() is called, which closes the serverSocketChannel 6. NioEchoServer thread checks serverSocketChannel.isOpen() and then terminates 7. Acceptor thread acquires the `newChannels` lock and adds the SocketChannel to `newChannels`. 8. Acceptor thread checks serverSocketChannel.isOpen() and then terminates. 9. NioEchoServer#close() stops blocking now that both other threads have terminated. The end result is that the leaked socket is left open in the `newChannels` list at the end of close(), which is incorrect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
[ https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16764: --- Description: A call to consumer.poll should throw InvalidTopicException if an invalid topic is discovered in metadata. This can be easily reproduced by calling subscribe("invalid topic") and then poll, for example.The new consumer does not throw the expected InvalidTopicException like the LegacyKafkaConsumer does. The legacy consumer achieves this by checking for metadata exceptions on every iteration of the ConsumerNetworkClient (see [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) This is probably what makes that [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] fails for the new consumer. Once this bug is fixed, we should be able to enable that test for the new consumer. was: A call to consumer.poll should throw InvalidTopicException if an invalid topic is discovered in metadata. This can be easily reproduced by calling subscribe("invalid topic") and then poll, for example.The new consumer does not throw the expected InvalidTopicException like the LegacyKafkaConsumer does. The legacy consumer achieves this by checking for metadata exceptions on every iteration of the ConsumerNetworkClient (see [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > New consumer should throw InvalidTopicException on poll when invalid topic in > metadata > -- > > Key: KAFKA-16764 > URL: https://issues.apache.org/jira/browse/KAFKA-16764 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > > A call to consumer.poll should throw InvalidTopicException if an invalid > topic is discovered in metadata. This can be easily reproduced by calling > subscribe("invalid topic") and then poll, for example.The new consumer does > not throw the expected InvalidTopicException like the LegacyKafkaConsumer > does. > The legacy consumer achieves this by checking for metadata exceptions on > every iteration of the ConsumerNetworkClient (see > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > This is probably what makes that > [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] > fails for the new consumer. Once this bug is fixed, we should be able to > enable that test for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
Lianet Magrans created KAFKA-16764: -- Summary: New consumer should throw InvalidTopicException on poll when invalid topic in metadata Key: KAFKA-16764 URL: https://issues.apache.org/jira/browse/KAFKA-16764 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans A call to consumer.poll should throw InvalidTopicException if an invalid topic is discovered in metadata. This can be easily reproduced by calling subscribe("invalid topic") and then poll, for example.The new consumer does not throw the expected InvalidTopicException like the LegacyKafkaConsumer does. The legacy consumer achieves this by checking for metadata exceptions on every iteration of the ConsumerNetworkClient (see [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1600452206 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -116,6 +125,71 @@ public void start(Map props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } +// read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task +private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) { + +class CheckpointRecordHandler { +private volatile KafkaException lastLoggedErrorReadingCheckpoints = null; + +void handle(Throwable error, ConsumerRecord cpRecord) { +// See KafkaBasedLog.poll : only KafkaException can be passed as error +if (error instanceof KafkaException) { +// only log once +if (lastLoggedErrorReadingCheckpoints == null || !lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) { +log.error("Error loading Checkpoint topic", error); +lastLoggedErrorReadingCheckpoints = (KafkaException) error; +} + +if (error instanceof RetriableException) { +return; +} else { +throw (KafkaException) error; +} +} else { // error is null +lastLoggedErrorReadingCheckpoints = null; +Checkpoint cp = Checkpoint.deserializeRecord(cpRecord); Review Comment: deserialization can fail due to bad data in the topic ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -116,6 +125,71 @@ public void start(Map props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } +// read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task +private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) { + +class CheckpointRecordHandler { +private volatile KafkaException lastLoggedErrorReadingCheckpoints = null; + +void handle(Throwable error, ConsumerRecord cpRecord) { +// See KafkaBasedLog.poll : only KafkaException can be passed as error +if (error instanceof KafkaException) { +// only log once +if (lastLoggedErrorReadingCheckpoints == null || !lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) { +log.error("Error loading Checkpoint topic", error); +lastLoggedErrorReadingCheckpoints = (KafkaException) error; +} + +if (error instanceof RetriableException) { +return; +} else { +throw (KafkaException) error; +} +} else { // error is null +lastLoggedErrorReadingCheckpoints = null; +Checkpoint cp = Checkpoint.deserializeRecord(cpRecord); +if (consumerGroups.contains(cp.consumerGroupId())) { +Map cps = checkpointsPerConsumerGroup.computeIfAbsent(cp.consumerGroupId(), ignored -> new HashMap<>()); +cps.put(cp.topicPartition(), cp); +} +} +} +} + +CheckpointRecordHandler handler = new CheckpointRecordHandler(); +TopicAdmin cpAdmin = null; +KafkaBasedLog previousCheckpoints = null; + +try { +cpAdmin = new TopicAdmin( +config.targetAdminConfig("checkpoint-target-admin"), + config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"))); + +previousCheckpoints = KafkaBasedLog.withExistingClients( +config.checkpointsTopic(), + MirrorUtils.newConsumer(config.targetConsumerConfig(CHECKPOINTS_TARGET_CONSUMER_ROLE)), +null, +cpAdmin, +(error, cpRecord) -> handler.handle(error, cpRecord), +Time.SYSTEM, +ignored -> { }, +topicPartition -> topicPartition.partition() == 0); + +log.info("Starting loading Checkpoint topic : {}", config.checkpointsTopic()); +previousCheckpoints.start(true); +previousCheckpoints.stop(); +log.info("Finished loading Checkpoint topic : {}", config.checkpointsTopic()); +log.debug("Initial checkpointsPerConsumerGroup : {}", checkpointsPerConsumerGroup); +return true; +} catch (KafkaException kexc)
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1600493799 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync( return EMPTY_RESULT; } +/** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ +private CoordinatorResult classicGroupSyncToConsumerGroup( +ConsumerGroup group, +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); + +ConsumerGroupMember member; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(request.memberId(), false); +} else { +member = group.staticMember(instanceId); +if (member == null) { +throw new UnknownMemberIdException( +String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) +); +} +throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); +} + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdOrProtocolUnmatched( +group, +member, +request.generationId(), +request.protocolType(), +request.protocolName() +); + +cancelConsumerGroupSyncTimeout(groupId, memberId); +//scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicMemberSessionTimeout()); Review Comment: I just merged it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add classic member session timeout to ClassicMemberMetadata [kafka]
dajac merged PR #15921: URL: https://github.com/apache/kafka/pull/15921 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: revisit LogValidatorTest#checkRecompression [kafka]
chia7712 commented on PR #15948: URL: https://github.com/apache/kafka/pull/15948#issuecomment-2110831638 @junrao thanks for all your reviews :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]
chia7712 commented on PR #15838: URL: https://github.com/apache/kafka/pull/15838#issuecomment-2110829824 > The new test seems to be failing: [testReduceNumNetworkThreads() – kafka.server.KRaftClusterTest](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15838/1/tests) can you check on it? see https://github.com/apache/kafka/pull/15838#discussion_r1585911858 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16686: Wait for given offset in TopicBasedRemoteLogMetadataManagerTest [kafka]
chia7712 commented on PR #15885: URL: https://github.com/apache/kafka/pull/15885#issuecomment-2110828020 > looks like we still suffer from thread leaks in CI :( I've rebased from trunk to trigger CI again I have noticed that too. so sad :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: revisit LogValidatorTest#checkRecompression [kafka]
junrao commented on code in PR #15948: URL: https://github.com/apache/kafka/pull/15948#discussion_r1600406699 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -497,9 +497,11 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") -// Both V2 and V1 has single branch in the record when compression is enable, and hence their shallow OffsetOfMaxTimestamp -// is the last offset of the single branch -assertEquals(1, records.batches().asScala.size) +// V2 has single batch, and other versions has many single-record batches +assertEquals(if (magic >= RecordBatch.MAGIC_VALUE_V2) 1 else 3, records.batches().asScala.size) +// Both V2 and V1 has single batch in the record when compression is enable, and hence their shallow OffsetOfMaxTimestamp Review Comment: in the record => in the validated records ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -497,9 +497,11 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") -// Both V2 and V1 has single branch in the record when compression is enable, and hence their shallow OffsetOfMaxTimestamp -// is the last offset of the single branch -assertEquals(1, records.batches().asScala.size) +// V2 has single batch, and other versions has many single-record batches +assertEquals(if (magic >= RecordBatch.MAGIC_VALUE_V2) 1 else 3, records.batches().asScala.size) Review Comment: Should we move this assert to immediate after records is created? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] improve log description of QuorumController [kafka]
mumrah commented on code in PR #15926: URL: https://github.com/apache/kafka/pull/15926#discussion_r1600407308 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -165,6 +165,8 @@ static ControllerResult recordsForNonEmptyLog( throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was " + "created in KRaft mode."); } +logMessageBuilder +.append("since this is a de-novo KRaft cluster."); Review Comment: How about "This is expected because this is a de-novo KRaft cluster." ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16759: Handle telemetry push response while terminating [kafka]
AndrewJSchofield opened a new pull request, #15957: URL: https://github.com/apache/kafka/pull/15957 When client telemetry is configured in a cluster, Kafka producers and consumers push metrics to the brokers periodically. There is a special push of metrics that occurs when the client is terminating. A state machine in the client telemetry reporter controls its behaviour in different states. Sometimes, when a client was terminating, it was attempting an invalid state transition from TERMINATING_PUSH_IN_PROGRESS to PUSH_NEEDED when it receives a response to a PushTelemetry RPC. This was essentially harmless because the state transition did not occur but it did cause unsightly log lines to be generated. This PR performs a check for the terminating states when receiving the response and simply remains in the current state. I added a test to validate the state management in this case. Actually, the test passes before the code change in the PR, but with unsightly log lines. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]
mumrah commented on PR #15838: URL: https://github.com/apache/kafka/pull/15838#issuecomment-2110742012 The new test seems to be failing: [`testReduceNumNetworkThreads() – kafka.server.KRaftClusterTest`](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15838/1/tests) can you check on it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException
[ https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-14567: -- Fix Version/s: 3.8.0 > Kafka Streams crashes after ProducerFencedException > --- > > Key: KAFKA-14567 > URL: https://issues.apache.org/jira/browse/KAFKA-14567 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Assignee: Kirk True >Priority: Blocker > Labels: eos > Fix For: 3.8.0 > > > Running a Kafka Streams application with EOS-v2. > We first see a `ProducerFencedException`. After the fencing, the fenced > thread crashed resulting in a non-recoverable error: > {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] > stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream > task 1_2 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskExecutor) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_2, processor=KSTREAM-SOURCE-05, > topic=node-name-repartition, partition=2, offset=539776276, > stacktrace=java.lang.IllegalStateException: TransactionalId > stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition > attempted from state FATAL_ERROR to state ABORTABLE_ERROR > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959) > at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.streams.processor.
[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException
[ https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-14567: -- Affects Version/s: 3.7.0 (was: 3.8.0) > Kafka Streams crashes after ProducerFencedException > --- > > Key: KAFKA-14567 > URL: https://issues.apache.org/jira/browse/KAFKA-14567 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Assignee: Kirk True >Priority: Blocker > Labels: eos > > Running a Kafka Streams application with EOS-v2. > We first see a `ProducerFencedException`. After the fencing, the fenced > thread crashed resulting in a non-recoverable error: > {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] > stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream > task 1_2 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskExecutor) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_2, processor=KSTREAM-SOURCE-05, > topic=node-name-repartition, partition=2, offset=539776276, > stacktrace=java.lang.IllegalStateException: TransactionalId > stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition > attempted from state FATAL_ERROR to state ABORTABLE_ERROR > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959) > at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.strea
[jira] [Assigned] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException
[ https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reassigned KAFKA-14567: - Assignee: Kirk True > Kafka Streams crashes after ProducerFencedException > --- > > Key: KAFKA-14567 > URL: https://issues.apache.org/jira/browse/KAFKA-14567 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Assignee: Kirk True >Priority: Major > Labels: eos > > Running a Kafka Streams application with EOS-v2. > We first see a `ProducerFencedException`. After the fencing, the fenced > thread crashed resulting in a non-recoverable error: > {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] > stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream > task 1_2 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskExecutor) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_2, processor=KSTREAM-SOURCE-05, > topic=node-name-repartition, partition=2, offset=539776276, > stacktrace=java.lang.IllegalStateException: TransactionalId > stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition > attempted from state FATAL_ERROR to state ABORTABLE_ERROR > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959) > at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) >
[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException
[ https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-14567: -- Priority: Blocker (was: Major) > Kafka Streams crashes after ProducerFencedException > --- > > Key: KAFKA-14567 > URL: https://issues.apache.org/jira/browse/KAFKA-14567 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Matthias J. Sax >Assignee: Kirk True >Priority: Blocker > Labels: eos > > Running a Kafka Streams application with EOS-v2. > We first see a `ProducerFencedException`. After the fencing, the fenced > thread crashed resulting in a non-recoverable error: > {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] > stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream > task 1_2 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskExecutor) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_2, processor=KSTREAM-SOURCE-05, > topic=node-name-repartition, partition=2, offset=539776276, > stacktrace=java.lang.IllegalStateException: TransactionalId > stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition > attempted from state FATAL_ERROR to state ABORTABLE_ERROR > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959) > at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.streams.processor.internals.TaskExecut
[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException
[ https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-14567: -- Affects Version/s: 3.8.0 > Kafka Streams crashes after ProducerFencedException > --- > > Key: KAFKA-14567 > URL: https://issues.apache.org/jira/browse/KAFKA-14567 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Matthias J. Sax >Assignee: Kirk True >Priority: Major > Labels: eos > > Running a Kafka Streams application with EOS-v2. > We first see a `ProducerFencedException`. After the fencing, the fenced > thread crashed resulting in a non-recoverable error: > {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] > stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream > task 1_2 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskExecutor) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_2, processor=KSTREAM-SOURCE-05, > topic=node-name-repartition, partition=2, offset=539776276, > stacktrace=java.lang.IllegalStateException: TransactionalId > stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition > attempted from state FATAL_ERROR to state ABORTABLE_ERROR > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959) > at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.proces
Re: [PR] KAFKA-16686: Wait for given offset in TopicBasedRemoteLogMetadataManagerTest [kafka]
gaurav-narula commented on PR #15885: URL: https://github.com/apache/kafka/pull/15885#issuecomment-2110720158 @chia7712 looks like we still suffer from thread leaks in CI :( I've rebased from trunk to trigger CI again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]
gaurav-narula commented on code in PR #15945: URL: https://github.com/apache/kafka/pull/15945#discussion_r1600362319 ## metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Tracks the registration of a specific broker, and executes a callback if it should be refreshed. + * + * This tracker handles cases where we might want to re-register the broker. The only such case + * right now is during the transition from non-JBOD mode, to JBOD mode. In other words, the + * transition from a MetadataVersion less than 3.7-IV2, to one greater than or equal to 3.7-IV2. + * In this case, the broker registration will start out containing no directories, and we need to + * resend the BrokerRegistrationRequest to fix that. + * + * As much as possible, the goal here is to keep things simple. We just compare the desired state + * with the actual state, and try to make changes only if necessary. + */ +public class BrokerRegistrationTracker implements MetadataPublisher { +private final Logger log; +private final int id; +private final Runnable refreshRegistrationCallback; + +/** + * Create the tracker. + * + * @param idThe ID of this broker. + * @param targetDirectories The directories managed by this broker. + * @param refreshRegistrationCallback Callback to run if we need to refresh the registration. + */ +public BrokerRegistrationTracker( +int id, +List targetDirectories, +Runnable refreshRegistrationCallback +) { +this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] "). +logger(BrokerRegistrationTracker.class); +this.id = id; +this.refreshRegistrationCallback = refreshRegistrationCallback; +} + +@Override +public String name() { +return "BrokerRegistrationTracker(id=" + id + ")"; +} + +@Override +public void onMetadataUpdate( +MetadataDelta delta, +MetadataImage newImage, +LoaderManifest manifest +) { +boolean checkBrokerRegistration = false; +if (delta.featuresDelta() != null) { +if (delta.metadataVersionChanged().isPresent()) { Review Comment: Is this only for the logging? Seems like we unconditionally pass `newImage.features().metadataVersion()` in `brokerRegistrationNeedsRefresh` down below and it only gates on `registration == null` 🤔 I tried commenting lines 78-86 out and the tests still pass which makes me wonder if `checkBrokerRegistration` can be simplified or if there's a test we're missing. ## metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache
[PR] KAFKA-15045: (KIP-924 pt. 4) Generify rack graph solving utilities [kafka]
apourchet opened a new pull request, #15956: URL: https://github.com/apache/kafka/pull/15956 The graph solving utilities are currently hardcoded to work with ClientState, but don't actually depend on anything in those state classes. This change allows the MinTrafficGraphConstructor and BalanceSubtopologyGraphConstructor to be reused with KafkaStreamsStates instead. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations
[ https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846377#comment-17846377 ] Luke D commented on KAFKA-16361: [~ableegoldman] , the bug is not present in 3.4 from our local reproduction test case. The API available to introduce the conditions as we are testing them are not present in 3.3 and below. So from our specific presentation of the bug it appears to be a regression introduced in 3.5. > Rack aware sticky assignor minQuota violations > -- > > Key: KAFKA-16361 > URL: https://issues.apache.org/jira/browse/KAFKA-16361 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.1, 3.7.0, 3.6.1 >Reporter: Luke D >Priority: Major > Attachments: illegalstateexception.log > > > In some low topic replication scenarios the rack aware assignment in the > StickyAssignor fails to balance consumers to its own expectations and throws > an IllegalStateException, commonly crashing the application (depending on > application implementation). While uncommon the error is deterministic, and > so persists until the replication state changes. > > We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it > locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely > would also be reproducible there) > > Here is the error and stack from our test case against 3.7.0 > {code:java} > We haven't reached the expected number of members with more than the minQuota > partitions, but no more partitions to be assigned > java.lang.IllegalStateException: We haven't reached the expected number of > members with more than the minQuota partitions, but no more partitions to be > assigned > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) > {code} > Here is a specific test case from 3.7.0 that fails when passed to > StickyAssignor.assign: > {code:java} > Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 > (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: > rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader > = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = > topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = > 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = > topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], > offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = > 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = > topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = > 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = > topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], > offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 8, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 33, leader = > 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = > topic_name,
[PR] MINOR: Fix warnings in streams javadoc [kafka]
mimaison opened a new pull request, #15955: URL: https://github.com/apache/kafka/pull/15955 Fixes the following warnings (`./gradlew streams:javadoc`): ``` > Task :streams:javadoc /Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java:74: warning - @param argument "assignment:" is not a parameter name. /Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java:74: warning - @param argument "subscription:" is not a parameter name. /Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java:74: warning - @param argument "error:" is not a parameter name. /Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:663: warning - Missing closing '}' character for inline tag: "{@code auto.include.jmx.reporter" /Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:663: warning - Tag @link: reference not found: JMX_REPORTER "jmx.reporter" /Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:579: warning - Missing closing '}' character for inline tag: "{@code default.windowed.key.serde.inner" /Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:587: warning - Missing closing '}' character for inline tag: "{@code default.windowed.value.serde.inner" /Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:663: warning - Tag @link: reference not found: JMX_REPORTER "jmx.reporter" /Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:663: warning - Tag @link: reference not found: JMX_REPORTER "jmx.reporter" /Users/mickael/github/kafka/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:663: warning - Tag @link: reference not found: JMX_REPORTER "jmx.reporter" ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14
Chia-Ping Tsai created KAFKA-16763: -- Summary: Upgrade to scala 2.12.19 and scala 2.13.14 Key: KAFKA-16763 URL: https://issues.apache.org/jira/browse/KAFKA-16763 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19) scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1600314641 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync( return EMPTY_RESULT; } +/** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ +private CoordinatorResult classicGroupSyncToConsumerGroup( +ConsumerGroup group, +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); + +ConsumerGroupMember member; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(request.memberId(), false); +} else { +member = group.staticMember(instanceId); +if (member == null) { +throw new UnknownMemberIdException( +String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) +); +} +throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); +} + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdOrProtocolUnmatched( +group, +member, +request.generationId(), +request.protocolType(), +request.protocolName() +); + Review Comment: I wanted to make it return REBALANCE_IN_PROGRESS but we seems to have no way to know whether a new rebalance is triggered in sync group... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1600311785 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync( return EMPTY_RESULT; } +/** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ +private CoordinatorResult classicGroupSyncToConsumerGroup( +ConsumerGroup group, +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); + +ConsumerGroupMember member; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(request.memberId(), false); +} else { +member = group.staticMember(instanceId); +if (member == null) { +throw new UnknownMemberIdException( +String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) +); +} +throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); +} + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdOrProtocolUnmatched( +group, +member, +request.generationId(), +request.protocolType(), +request.protocolName() +); + +cancelConsumerGroupSyncTimeout(groupId, memberId); +//scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicMemberSessionTimeout()); Review Comment: Needs https://github.com/apache/kafka/pull/15921 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 opened a new pull request, #15954: URL: https://github.com/apache/kafka/pull/15954 This pr implements the sync group api for the consumer groups that are in the mixed mode. In `classicGroupSyncToConsumerGroup`, the `assignedPartitions` calculated in the JoinGroup will be returned as the assignment in the sync response and the member session timeout will be rescheduled. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16699: Have Streams treat InvalidPidMappingException like a ProducerFencedException [kafka]
wcarlson5 merged PR #15919: URL: https://github.com/apache/kafka/pull/15919 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16762) SyncGroup API for upgrading ConsumerGroup
Dongnuo Lyu created KAFKA-16762: --- Summary: SyncGroup API for upgrading ConsumerGroup Key: KAFKA-16762 URL: https://issues.apache.org/jira/browse/KAFKA-16762 Project: Kafka Issue Type: Sub-task Reporter: Dongnuo Lyu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16762) SyncGroup API for upgrading ConsumerGroup
[ https://issues.apache.org/jira/browse/KAFKA-16762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongnuo Lyu reassigned KAFKA-16762: --- Assignee: Dongnuo Lyu > SyncGroup API for upgrading ConsumerGroup > - > > Key: KAFKA-16762 > URL: https://issues.apache.org/jira/browse/KAFKA-16762 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16699: Have Streams treat InvalidPidMappingException like a ProducerFencedException [kafka]
wcarlson5 commented on PR #15919: URL: https://github.com/apache/kafka/pull/15919#issuecomment-2110611187 There were no new or related test failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
chia7712 commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1600301660 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -314,6 +314,77 @@ class ReplicaManagerTest { } } + @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with futureLogCreated: {0}") + @ValueSource(booleans = Array(true, false)) + def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): Unit = { +val dir1 = TestUtils.tempDir() +val dir2 = TestUtils.tempDir() +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) +val config = KafkaConfig.fromProps(props) +val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) +val spyLogManager = spy(logManager) +val metadataCache: MetadataCache = mock(classOf[MetadataCache]) +mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) +val tp0 = new TopicPartition(topic, 0) +val uuid = Uuid.randomUuid() +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = spyLogManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +try { + val partition = rm.createPartition(tp0) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, +new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + + rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, +Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, +Collections.singletonMap(topic, Uuid.randomUuid()), Review Comment: Is it possible that `LeaderAndIsrRequest` carries a different topic id ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16539) Can't update specific broker configs in pre-migration mode
[ https://issues.apache.org/jira/browse/KAFKA-16539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846365#comment-17846365 ] David Arthur commented on KAFKA-16539: -- [~chia7712] here's a cherry-pick to 3.7 https://github.com/apache/kafka/pull/15953 > Can't update specific broker configs in pre-migration mode > -- > > Key: KAFKA-16539 > URL: https://issues.apache.org/jira/browse/KAFKA-16539 > Project: Kafka > Issue Type: Bug > Components: config, kraft >Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2 >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.8.0, 3.7.1, 3.6.3 > > > In migration mode, ZK brokers will have a forwarding manager configured. This > is used to forward requests to the KRaft controller once we get to that part > of the migration. However, prior to KRaft taking over as the controller > (known as pre-migration mode), the ZK brokers are still attempting to forward > IncrementalAlterConfigs to the controller. > This works fine for cluster level configs (e.g., "-entity-type broker > --entity-default"), but this fails for specific broker configs (e.g., > "-entity-type broker --entity-id 1"). > This affects BROKER and BROKER_LOGGER config types. > To workaround this bug, you can either disable migrations on the brokers > (assuming no migration has taken place), or proceed with the migration and > get to the point where KRaft is the controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Cherry-pick KAFKA-16539 to 3.7 [kafka]
mumrah opened a new pull request, #15953: URL: https://github.com/apache/kafka/pull/15953 This patch fixes two issues with IncrementalAlterConfigs and the ZK migration. First, it changes the handling of IncrementalAlterConfigs to check if the controller is ZK vs KRaft and only forward for KRaft. Second, it adds a check in KafkaZkClient#setOrCreateEntityConfigs to ensure a ZK broker is not directly modifying configs in ZK if there is a KRaft controller. This closes the race condition between KRaft taking over as the active controller and the ZK brokers learning about this. *Forwarding* During the ZK migration, there is a time when the ZK brokers are running with migrations enabled, but KRaft has yet to take over as the controller. Prior to KRaft taking over as the controller, the ZK brokers in migration mode were unconditionally forwarding IncrementalAlterConfigs (IAC) to the ZK controller. This works for some config types, but breaks when setting BROKER and BROKER_LOGGER configs for a specific broker. The behavior in KafkaApis for IAC was to always forward if the forwarding manager was defined. Since ZK brokers in migration mode have forwarding enabled, the forwarding would happen, and the special logic for BROKER and BROKER_LOGGER would be missed, causing the request to fail. With this fix, the IAC handler will check if the controller is KRaft or ZK and only forward for KRaft. *Protected ZK Writes* As part of KIP-500, we moved most (but not all) ZK mutations to the ZK controller. One of the things we did not move fully to the controller was entity configs. This is because there was some special logic that needed to run on the broker for certain config updates. If a broker-specific config was set, AdminClient would route the request to the proper broker. In KRaft, we have a different mechanism for handling broker-specific config updates. Leaving this ZK update on the broker side would be okay if we were guarding writes on the controller epoch, but it turns out KafkaZkClient#setOrCreateEntityConfigs does unprotected "last writer wins" updates to ZK. This means a ZK broker could update the contents of ZK after the metadata had been migrated to KRaft. No good! To fix this, this patch adds a check on the controller epoch to KafkaZkClient#setOrCreateEntityConfigs but also adds logic to fail the update if the controller is a KRaft controller. The new logic in setOrCreateEntityConfigs adds STALE_CONTROLLER_EPOCH as a new exception that can be thrown while updating configs. Reviewers: Luke Chen , Akhilesh Chaganti , Chia-Ping Tsai Conflicts: Minor conflicts due to the config refactoring in trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16406) Split long-running consumer integration test
[ https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16406: --- Labels: consumer kip-848-client-support (was: ) > Split long-running consumer integration test > > > Key: KAFKA-16406 > URL: https://issues.apache.org/jira/browse/KAFKA-16406 > Project: Kafka > Issue Type: Task >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer, kip-848-client-support > Fix For: 3.8.0 > > > PlaintextConsumerTest contains integration tests for the consumer. Since the > introduction of the new consumer group protocol (KIP-848) and the new > KafkaConsumer, this test has been parametrized to run with multiple > combinations, making sure we test the logic for the old and new coordinator, > as well as for the legacy and new KafkaConsumer. > This led to this being one of the longest-running integration tests, so in > the aim of reducing the impact on the build times we could split it to allow > for parallelization. The tests covers multiple areas of the consumer logic, > in a single file, so splitting based on the high-level features being tested > would be sensible and achieve the result wanted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-16406) Split long-running consumer integration test
[ https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans closed KAFKA-16406. -- > Split long-running consumer integration test > > > Key: KAFKA-16406 > URL: https://issues.apache.org/jira/browse/KAFKA-16406 > Project: Kafka > Issue Type: Task >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Fix For: 3.8.0 > > > PlaintextConsumerTest contains integration tests for the consumer. Since the > introduction of the new consumer group protocol (KIP-848) and the new > KafkaConsumer, this test has been parametrized to run with multiple > combinations, making sure we test the logic for the old and new coordinator, > as well as for the legacy and new KafkaConsumer. > This led to this being one of the longest-running integration tests, so in > the aim of reducing the impact on the build times we could split it to allow > for parallelization. The tests covers multiple areas of the consumer logic, > in a single file, so splitting based on the high-level features being tested > would be sensible and achieve the result wanted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16406) Split long-running consumer integration test
[ https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-16406. Resolution: Fixed > Split long-running consumer integration test > > > Key: KAFKA-16406 > URL: https://issues.apache.org/jira/browse/KAFKA-16406 > Project: Kafka > Issue Type: Task >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Fix For: 3.8.0 > > > PlaintextConsumerTest contains integration tests for the consumer. Since the > introduction of the new consumer group protocol (KIP-848) and the new > KafkaConsumer, this test has been parametrized to run with multiple > combinations, making sure we test the logic for the old and new coordinator, > as well as for the legacy and new KafkaConsumer. > This led to this being one of the longest-running integration tests, so in > the aim of reducing the impact on the build times we could split it to allow > for parallelization. The tests covers multiple areas of the consumer logic, > in a single file, so splitting based on the high-level features being tested > would be sensible and achieve the result wanted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16700) Kafka Streams: possible message loss on KTable-KTable FK Left Join
[ https://issues.apache.org/jira/browse/KAFKA-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846363#comment-17846363 ] Ayoub Omari commented on KAFKA-16700: - [~stoeckmk] About KIP-962, it's not about null foreign keys, it's only about null keys of the left topic. So null foreign keys are still behaving the same way as before the KIP. > Kafka Streams: possible message loss on KTable-KTable FK Left Join > -- > > Key: KAFKA-16700 > URL: https://issues.apache.org/jira/browse/KAFKA-16700 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 > Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and > 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka > Operators >Reporter: Karsten Stöckmann >Priority: Major > Labels: dsl, joins, streams > > We are experiencing significant, yet intermittent / non-deterministic / > unexplainable message loss on a Kafka Streams topology while performing a > *KTable-KTable* {*}FK Left Join{*}. > Assume the following snippet: > {code:java} > streamsBuilder > .table( > folderTopicName, > Consumed.with( > folderKeySerde, > folderSerde)) > .leftJoin( > agencies, // KTable > Folder::agencyIdValue, > AggregateFolder::new, > TableJoined.as("folder-to-agency"), > Materialized > .as("folder-to-agency-materialized") > .withKeySerde(folderKeySerde) > .withValueSerde(aggregateFolderSerde)) > .leftJoin( > documents, > {code} > The setup is as follows: > A Debezium Connector for PostgreSQL streams database changes into various > Kafka topics. A series of Quarkus Kafka Streams applications then performs > aggregation operations on those topics to create index documents later to be > sent into an OpenSearch system. > When firing up the Kafka Streams infrastructure to work on initially > populated Kafka Topics (i.e. a snapshot of all relevant table data has been > streamed to Kafka), the above shown KTable-KTable FK Left Join seems to > produce message loss on the first of a series of FK Left Joins; the right > hand {{KTable}} is consumed from an aggregated > topic fed from another Kafka Streams topology / application. > On a (heavily reduced) test data set of 6828 messages in the > {{folderTopicName}} Topic, we observe the following results: > * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages > * {{{}folder-to-agency-subscription-response{}}}: *3048* messages > * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages > * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages. > Telling from the nature of a (FK) Left Join, I'd expect all messages from the > left hand topic should produce an aggregate even if no matching message is > found in the right hand topic. > Message loss unpredictably varies across tests and seems not to be bound to > specific keys or messages. > As it seems, this can only be observed when initially firing up the Streams > infrastructure to process the message 'backlog' that had been snapshotted by > Debezium. A manual snapshot triggered later (i.e. Streams applications > already running) seems not to show this behaviour. Additionally, as of yet we > observed this kind of message loss only when running multiple replicas of the > affected application. When carrying out the tests with only one replica, > everything seems to work as expected. We've tried to leverage > {{group.initial.rebalance.delay.ms}} in order to rule out possible > rebalancing issues, but to no avail. > Our Kafka configuration: > {code:yaml} > offsets.topic.replication.factor: 3 > transaction.state.log.replication.factor: 3 > transaction.state.log.min.isr: 2 > default.replication.factor: 3 > min.insync.replicas: 2 > message.max.bytes: "20971520" > {code} > Our Kafka Streams application configuration: > {code:yaml} > kafka-streams.num.stream.threads: 5 > kafka-streams.num.standby.replicas: 1 > kafka-streams.auto.offset.reset: earliest > kafka-streams.cache.max.bytes.buffering: "20971520" > kafka-streams.commit.interval.ms: 100 > kafka-streams.fetch.max.bytes: "10485760" > kafka-streams.max.request.size: "10485760" > kafka-streams.max.partition.fetch.bytes: "10485760" > kafka-streams.metadata.max.age.ms: 30 > kafka-streams.statestore.cache.max.bytes: "20971520" > kafka-streams.topology.optimization: all > kafka-streams.processing.guarantee: exactly_once_v2 > # Kafka Streams Intermediate Topics > kafka-streams.topic.compression.type: lz4 > kafk
Re: [PR] KAFKA-16671: enable test for ensureInternalEndpointIsSecured [kafka]
FrankYang0529 commented on code in PR #15868: URL: https://github.com/apache/kafka/pull/15868#discussion_r1600286651 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java: ## @@ -108,8 +110,8 @@ public void ensureInternalEndpointIsSecured() throws Throwable { connectorTasksEndpoint ); assertEquals( -BAD_REQUEST.getStatusCode(), -connect.requestPost(connectorTasksEndpoint, "[]", emptyHeaders).getStatus() +BAD_REQUEST.getStatusCode(), Review Comment: Revert it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16759) Invalid client telemetry transition on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-16759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846360#comment-17846360 ] Andrew Schofield commented on KAFKA-16759: -- Analysing the transitions, the invalid transition is real. {noformat} +++ SUBSCRIPTION_NEEDED -->> SUBSCRIPTION_IN_PROGRESS +++ SUBSCRIPTION_IN_PROGRESS -->> PUSH_NEEDED ^C+++ PUSH_NEEDED -->> TERMINATING_PUSH_NEEDED +++ TERMINATING_PUSH_NEEDED -->> TERMINATING_PUSH_IN_PROGRESS +++ TERMINATING_PUSH_IN_PROGRESS -->> PUSH_NEEDED [2024-05-14 16:48:05,043] WARN Error updating client telemetry state, disabled telemetry (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) java.lang.IllegalStateException: Invalid telemetry state transition from TERMINATING_PUSH_IN_PROGRESS to PUSH_NEEDED; the valid telemetry state transitions from TERMINATING_PUSH_IN_PROGRESS are: TERMINATED at org.apache.kafka.common.telemetry.ClientTelemetryState.validateTransition(ClientTelemetryState.java:163) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.maybeSetState(ClientTelemetryReporter.java:828) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.handleResponse(ClientTelemetryReporter.java:520) at org.apache.kafka.clients.NetworkClient$TelemetrySender.handleResponse(NetworkClient.java:1321) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:948) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:594) at org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.poll(NetworkClientDelegate.java:130) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:140) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88) [2024-05-14 16:48:05,044] WARN Unable to transition state after successful push telemetry from state TERMINATING_PUSH_IN_PROGRESS (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) +++ TERMINATING_PUSH_IN_PROGRESS -->> TERMINATED{noformat} > Invalid client telemetry transition on consumer close > - > > Key: KAFKA-16759 > URL: https://issues.apache.org/jira/browse/KAFKA-16759 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Minor > Fix For: 3.8.0 > > > Using the console consumer with client telemetry enabled, I hit an invalid > state transition when closing the consumer with CTRL-C. The consumer sends a > final "terminating" telemetry push which puts the client telemetry reporter > into TERMINATING_PUSH_IN_PROGRESS state. When it receives a response in this > state, it attempts an invalid state transition. > > {noformat} > [2024-05-13 19:19:35,804] WARN Error updating client telemetry state, > disabled telemetry > (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) > java.lang.IllegalStateException: Invalid telemetry state transition from > TERMINATING_PUSH_IN_PROGRESS to PUSH_NEEDED; the valid telemetry state > transitions from TERMINATING_PUSH_IN_PROGRESS are: TERMINATED > at > org.apache.kafka.common.telemetry.ClientTelemetryState.validateTransition(ClientTelemetryState.java:163) > at > org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.maybeSetState(ClientTelemetryReporter.java:827) > at > org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.handleResponse(ClientTelemetryReporter.java:520) > at > org.apache.kafka.clients.NetworkClient$TelemetrySender.handleResponse(NetworkClient.java:1321) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:948) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:594) > at > org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.poll(NetworkClientDelegate.java:130) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.sendUnsentRequests(ConsumerNetworkThread.java:262) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.cleanup(ConsumerNetworkThread.java:275) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:95) > [2024-05-13 19:19:35,805] WARN Unable to transition state after successful > push telemetry from state TERMINATING_PUSH_IN_PROGRESS > (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
gharris1727 commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1600269458 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -43,7 +42,7 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; -class KStreamKStreamJoin implements ProcessorSupplier { +abstract class KStreamKStreamJoin implements ProcessorSupplier { Review Comment: Just to throw my own explanation in here: VLeft and VRight are "absolute" in that they are the left and right types of the overall join. Both sides of the join have equivalent VLeft and VRight types, because they share a common outerJoinStore instance. VThis and VOther are "relative" in that they are the type of records entering "this" side of the join, and the "other" side of the join, and this is necessarily swapped for the other side of the join. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1600267022 ## clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java: ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.compress; + +import com.github.luben.zstd.BufferPool; +import com.github.luben.zstd.RecyclingBufferPool; +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdInputStreamNoFinalizer; +import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferInputStream; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.ChunkedBytesStream; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class ZstdCompression implements Compression { + +public static final int MIN_LEVEL = Zstd.minCompressionLevel(); +public static final int MAX_LEVEL = Zstd.maxCompressionLevel(); +public static final int DEFAULT_LEVEL = Zstd.defaultCompressionLevel(); + +private final int level; + +private ZstdCompression(int level) { +this.level = level; +} + +@Override +public CompressionType type() { +return CompressionType.ZSTD; +} + +@Override +public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion) { +try { +// Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance +// in cases where the caller passes a small number of bytes to write (potentially a single byte). +return new BufferedOutputStream(new ZstdOutputStreamNoFinalizer(bufferStream, RecyclingBufferPool.INSTANCE, level), 16 * 1024); +} catch (Throwable e) { +throw new KafkaException(e); +} +} + +@Override +public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { +try { +return new ChunkedBytesStream(wrapForZstdInput(buffer, decompressionBufferSupplier), +decompressionBufferSupplier, +decompressionOutputSize(), +false); +} catch (Throwable e) { +throw new KafkaException(e); +} +} + +// visible for testing +public ZstdInputStreamNoFinalizer wrapForZstdInput(ByteBuffer buffer, BufferSupplier decompressionBufferSupplier) throws IOException { Review Comment: I considered inlining this method as it's only used in a test which kind of test in `DefaultRecordBatchTest` the internal behavior of `ZstdInputStreamNoFinalizer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol
[ https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15284: -- Fix Version/s: 4.0.0 > Implement ConsumerGroupProtocolVersionResolver to determine consumer group > protocol > --- > > Key: KAFKA-15284 > URL: https://issues.apache.org/jira/browse/KAFKA-15284 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: kip-848-client-support > Fix For: 4.0.0 > > > At client initialization, we need to determine which of the > {{ConsumerDelegate}} implementations to use: > # {{LegacyKafkaConsumerDelegate}} > # {{AsyncKafkaConsumerDelegate}} > There are conditions defined by KIP-848 that determine client eligibility to > use the new protocol. This will be modeled by the—deep > breath—{{{}ConsumerGroupProtocolVersionResolver{}}}. > Known tasks: > * Determine at what point in the {{Consumer}} initialization the network > communication should happen > * Determine what RPCs to invoke in order to determine eligibility (API > versions, IBP version, etc.) > * Implement the network client lifecycle (startup, communication, shutdown, > etc.) > * Determine the fallback path in case the client is not eligible to use the > protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15697) Add local assignor and ensure it cannot be used with server side assignor
[ https://issues.apache.org/jira/browse/KAFKA-15697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15697: -- Fix Version/s: 4.0.0 > Add local assignor and ensure it cannot be used with server side assignor > - > > Key: KAFKA-15697 > URL: https://issues.apache.org/jira/browse/KAFKA-15697 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Philip Nee >Priority: Major > Labels: kip-848-client-support > Fix For: 4.0.0 > > > When we start supporting local/client-side assignor, we should: > # Add the config to ConsumerConfig > # Examine where should we implement to logic to ensure it is not used along > side with the server side assignor, i.e. you can only specify local or remote > assignor, or non. > ## If both assignors are specified: Throw illegalArgumentException -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15282) Implement client support for KIP-848 client-side assignors
[ https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15282: -- Fix Version/s: 4.0.0 > Implement client support for KIP-848 client-side assignors > -- > > Key: KAFKA-15282 > URL: https://issues.apache.org/jira/browse/KAFKA-15282 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: kip-848-client-support > Fix For: 4.0.0 > > > The client-side assignor provides the logic for the partition assignments > instead of on the server. Client-side assignment is the main approach used by > the “old protocol” for divvying up partitions. While the “new protocol” > favors server-side assignors, the client-side assignor will continue to be > used for backward compatibility, including KSQL, Connect, etc. > Note: I _*think*_ that the client-side assignor logic and the reconciliation > logic can remain separate from each other. We should strive to keep the two > pieces unencumbered, unless it’s unavoidable. > This task includes: > * Validate the client’s configuration for assignor selection > * Integrate with the new {{PartitionAssignor}} interface to invoke the logic > from the user-provided assignor implementation > * Implement the necessary logic around the request/response from the > {{ConsumerGroupPrepareAssignment}} RPC call using the information from the > {{PartitionAssignor}} above > * Implement the necessary logic around the request/response from the > {{ConsumerGroupInstallAssignment}} RPC call, again using the information > calculated by the {{PartitionAssignor}} > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15279) Implement client support for KIP-848 client-side assigner RPCs
[ https://issues.apache.org/jira/browse/KAFKA-15279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15279: -- Fix Version/s: 4.0.0 > Implement client support for KIP-848 client-side assigner RPCs > -- > > Key: KAFKA-15279 > URL: https://issues.apache.org/jira/browse/KAFKA-15279 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: kip-848-client-support > Fix For: 4.0.0 > > > The protocol introduces three new RPCs that the client uses to communicate > with the broker: > # > [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI] > # > [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI] > # > [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI] > Support for ConsumerGroupHeartbeat is handled by KAFKA-15278. This task is to > implement the ConsumerGroupAssignmentRequestManager to handle the second and > third RPCs on the above list. > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1600263287 ## clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.ChunkedBytesStream; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class Lz4Compression implements Compression { + +public static final int MIN_LEVEL = 1; +public static final int MAX_LEVEL = 17; +public static final int DEFAULT_LEVEL = 9; Review Comment: I hesitated defining these constants for this reason but these levels have not changed over 10 years [0], so hopefully this won't require a lot of maintenance. 0: https://github.com/lz4/lz4-java/blame/master/src/java/net/jpountz/lz4/LZ4Constants.java#L23-L24 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16406) Split long-running consumer integration test
[ https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846354#comment-17846354 ] Kirk True commented on KAFKA-16406: --- [~lianetm]—both PRs are closed. Can this be marked as resolved? > Split long-running consumer integration test > > > Key: KAFKA-16406 > URL: https://issues.apache.org/jira/browse/KAFKA-16406 > Project: Kafka > Issue Type: Task >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Fix For: 3.8.0 > > > PlaintextConsumerTest contains integration tests for the consumer. Since the > introduction of the new consumer group protocol (KIP-848) and the new > KafkaConsumer, this test has been parametrized to run with multiple > combinations, making sure we test the logic for the old and new coordinator, > as well as for the legacy and new KafkaConsumer. > This led to this being one of the longest-running integration tests, so in > the aim of reducing the impact on the build times we could split it to allow > for parallelization. The tests covers multiple areas of the consumer logic, > in a single file, so splitting based on the high-level features being tested > would be sensible and achieve the result wanted. -- This message was sent by Atlassian Jira (v8.20.10#820010)