[GitHub] [kafka] yashmayya opened a new pull request, #13375: KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode
yashmayya opened a new pull request, #13375: URL: https://github.com/apache/kafka/pull/13375 - The existing `EmbeddedConnectCluster` (used in Connect integration tests) uses a backing `EmbeddedKafkaCluster` which internally also spins up an `EmbeddedZookeeper`. - This patch migrates the `EmbeddedKafkaCluster` to run in KRaft mode, leveraging the existing [KafkaClusterTestKit](https://github.com/apache/kafka/blob/5f6a050bfee09b634497f9ba35e2964289be1e4d/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L80) from `core`. - Connect / MirrorMaker integration tests which setup a Kafka cluster with `SASL_PLAINTEXT` or `SSL` listeners needed to be updated to take into account the controller listeners as well and also update the authorizer used (from `kafka.security.authorizer.AclAuthorizer` to `org.apache.kafka.metadata.authorizer.StandardAuthorizer`, see [KIP-801](https://cwiki.apache.org/confluence/display/KAFKA/KIP-801%3A+Implement+an+Authorizer+that+stores+metadata+in+__cluster_metadata)) since the old authorizer relied on ZooKeeper. - The existing `EmbeddedKafkaCluster` had some logic to restart brokers and have them listening on the same ports as earlier (in order to verify Connect's functionality when its backing Kafka cluster goes down and then comes back up). This was refactored to move the responsibility of using a fixed port in the broker's listeners config to the tests themselves. - Some changes to `KafkaClusterTestKit` in order to allow externally configuring listeners configurations and other minor improvements. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10807) AlterConfig should be validated by the target broker
[ https://issues.apache.org/jira/browse/KAFKA-10807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vinoth reassigned KAFKA-10807: -- Assignee: Vinoth > AlterConfig should be validated by the target broker > > > Key: KAFKA-10807 > URL: https://issues.apache.org/jira/browse/KAFKA-10807 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Vinoth >Priority: Major > > After forwarding is enabled, AlterConfigs will no longer be sent to the > target broker. This behavior bypasses some important config change > validations, such as path existence, static config conflict, or even worse > when the target broker is offline, the propagated result does not reflect a > true applied result. We should gather those necessary cases, and decide > whether to actually handle the AlterConfig request firstly on the target > broker, and then forward, in a validate-forward-apply path. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-10657) Incorporate Envelope into auto-generated JSON schema
[ https://issues.apache.org/jira/browse/KAFKA-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vinoth resolved KAFKA-10657. Resolution: Resolved Has been fixed in https://issues.apache.org/jira/browse/KAFKA-10525 > Incorporate Envelope into auto-generated JSON schema > > > Key: KAFKA-10657 > URL: https://issues.apache.org/jira/browse/KAFKA-10657 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Vinoth >Priority: Major > > We need to add support to output JSON format for embed request inside > Envelope to do better request logging. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on a diff in pull request #10244: KAFKA-12399: Deprecate Log4J Appender
showuon commented on code in PR #10244: URL: https://github.com/apache/kafka/pull/10244#discussion_r1131920152 ## config/tools-log4j2.properties: ## @@ -12,14 +12,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +name=ToolsConfig +status=OFF -# Define the root logger with appender file -log4j.rootLogger = {{ log_level|default("INFO") }}, FILE +appenders=stderr -log4j.appender.FILE=org.apache.log4j.FileAppender -log4j.appender.FILE.File={{ log_file }} -log4j.appender.FILE.ImmediateFlush=true -# Set the append to false, overwrite -log4j.appender.FILE.Append=false -log4j.appender.FILE.layout=org.apache.log4j.PatternLayout -log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n +appender.stderr.type=Console +appender.stderr.name=STDERR +appender.stderr.layout.type=PatternLayout +appender.stderr.layout.pattern=[%d] %p %m (%c)%n + +rootLogger.level=WARN Review Comment: The original default is "INFO", why do we change to WARN? ## docs/upgrade.html: ## @@ -25,6 +25,195 @@ Notable changes in 3 which meant that idempotence remained disabled unless the user had explicitly set enable.idempotence to true (See https://issues.apache.org/jira/browse/KAFKA-13598";>KAFKA-13598for more details). This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0. + +The log4j 1.x based log4j-appender is now deprecated and no longer recommended. This artifact will be released for a while, but will be removed at the next major release. Review Comment: If we decided to add this in v3.5, this upgrade doc should also move to 3.5. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14776) Update SCRAM system tests to run with KRaft
[ https://issues.apache.org/jira/browse/KAFKA-14776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698719#comment-17698719 ] Proven Provenzano commented on KAFKA-14776: --- [PR #13374|https://github.com/apache/kafka/pull/13374]includes both KIP-900 implementation and updated system tests. > Update SCRAM system tests to run with KRaft > --- > > Key: KAFKA-14776 > URL: https://issues.apache.org/jira/browse/KAFKA-14776 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: Proven Provenzano >Assignee: Proven Provenzano >Priority: Major > > I will update the SCRAM system tests to run under both ZK and KRaft quorum > mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-10657) Incorporate Envelope into auto-generated JSON schema
[ https://issues.apache.org/jira/browse/KAFKA-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vinoth reassigned KAFKA-10657: -- Assignee: Vinoth > Incorporate Envelope into auto-generated JSON schema > > > Key: KAFKA-10657 > URL: https://issues.apache.org/jira/browse/KAFKA-10657 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Vinoth >Priority: Major > > We need to add support to output JSON format for embed request inside > Envelope to do better request logging. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed
showuon commented on PR #13348: URL: https://github.com/apache/kafka/pull/13348#issuecomment-1463202757 The test code, I mean. We already have some unit tests for producerPerformance [here](https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java). Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano opened a new pull request, #13374: KRAFT-14765
pprovenzano opened a new pull request, #13374: URL: https://github.com/apache/kafka/pull/13374 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed
hudeqi commented on PR #13348: URL: https://github.com/apache/kafka/pull/13348#issuecomment-1463200864 > @hudeqi , thanks for the patch. This change makes sense to me. Could you add tests for this change? Do you want to add test screenshots/results for this case, or the test code corresponding to this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed
hudeqi commented on PR #13348: URL: https://github.com/apache/kafka/pull/13348#issuecomment-1463199545 > > -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi closed pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed
hudeqi closed pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed URL: https://github.com/apache/kafka/pull/13348 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed
hudeqi commented on PR #13348: URL: https://github.com/apache/kafka/pull/13348#issuecomment-1463199231 > -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge opened a new pull request, #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)
tinaselenge opened a new pull request, #13373: URL: https://github.com/apache/kafka/pull/13373 Implements https://cwiki.apache.org/confluence/display/KAFKA/KIP-894%3A+Use+incrementalAlterConfigs+API+for+syncing+topic+configurations ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #13366: MINOR; Export control record type value
jsancio merged PR #13366: URL: https://github.com/apache/kafka/pull/13366 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #13366: MINOR; Export control record type value
jsancio commented on PR #13366: URL: https://github.com/apache/kafka/pull/13366#issuecomment-1462954942 Merging. Unrelated test failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #13337: MINOR: Refactor the MetadataPublisher interface
cmccabe merged PR #13337: URL: https://github.com/apache/kafka/pull/13337 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13337: MINOR: Refactor the MetadataPublisher interface
cmccabe commented on code in PR #13337: URL: https://github.com/apache/kafka/pull/13337#discussion_r1131705968 ## metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java: ## @@ -200,7 +201,24 @@ void resetSnapshotCounters() { } @Override -public void publishSnapshot( +public void onMetadataUpdate( +MetadataDelta delta, +MetadataImage newImage, +LoaderManifest manifest +) { +switch (manifest.type()) { +case LOG_DELTA: +publishLogDelta(delta, newImage, (LogDeltaManifest) manifest); +break; +case SNAPSHOT: +publishSnapshot(delta, newImage, (SnapshotManifest) manifest); +break; +default: +break; Review Comment: Actually, let's just remove the `default` case. spotbugs will flag non-exhaustive switch statements on enums. And that's a hard build error for us. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #13337: MINOR: Refactor the MetadataPublisher interface
rondagostino commented on PR #13337: URL: https://github.com/apache/kafka/pull/13337#issuecomment-1462835684 Looks like compile failures in `BrokerMetadataPublisherTest.scala` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13337: MINOR: Refactor the MetadataPublisher interface
rondagostino commented on code in PR #13337: URL: https://github.com/apache/kafka/pull/13337#discussion_r1131578840 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -213,32 +212,13 @@ class BrokerMetadataPublisher( } // Apply configuration deltas. - dynamicConfigPublisher.publish(delta, newImage) + dynamicConfigPublisher.onMetadataUpdate(delta, newImage) // Apply client quotas delta. - try { -Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta => - clientQuotaMetadataManager.update(clientQuotasDelta) -} - } catch { -case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating client " + - s"quotas in $deltaName", t) - } + dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage) - // Apply changes to SCRAM credentials. - Option(delta.scramDelta()).foreach { scramDelta => -scramDelta.changes().forEach { - case (mechanism, userChanges) => -userChanges.forEach { - case (userName, change) => -if (change.isPresent) { - credentialProvider.updateCredential(mechanism, userName, change.get().toCredential(mechanism)) -} else { - credentialProvider.removeCredentials(mechanism, userName) -} -} -} - } + // APply SCRAM delta. Review Comment: nit: `s/AP/Ap/` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring
kirktrue commented on PR #13301: URL: https://github.com/apache/kafka/pull/13301#issuecomment-1462812863 @guozhangwang I've added some dedicated unit tests. Keep in mind that these classes are all still used by the `Fetcher` and thus the extensive `FetcherTest` class covers them too. When running `FetcherTest` in code coverage mode, well over 95+% of the code is covered by the existing tests already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698597#comment-17698597 ] Jorge Esteban Quilcate Otoya commented on KAFKA-2967: - [~tombentley] , to alleviate a bit the broken links issue, we could have some of the main headers in the index page to reproduce the anchors and have the link of each section below. Something like this: !https://raw.githubusercontent.com/jeqo/ak-docs/main/index_redirects.png! > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > Labels: documentation > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive
[ https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-13891: - Assignee: Philip Nee (was: Kirk True) > sync group failed with rebalanceInProgress error cause rebalance many rounds > in coopeartive > --- > > Key: KAFKA-13891 > URL: https://issues.apache.org/jira/browse/KAFKA-13891 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Shawn Wang >Assignee: Philip Nee >Priority: Major > Fix For: 3.5.0 > > > This issue was first found in > [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419] > But the previous PR forgot to reset generation when sync group failed with > rebalanceInProgress error. So the previous bug still exists and it may cause > consumer to rebalance many rounds before final stable. > Here's the example ({*}bold is added{*}): > # consumer A joined and synced group successfully with generation 1 *( with > ownedPartition P1/P2 )* > # New rebalance started with generation 2, consumer A joined successfully, > but somehow, consumer A doesn't send out sync group immediately > # other consumer completed sync group successfully in generation 2, except > consumer A. > # After consumer A send out sync group, the new rebalance start, with > generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group > response > # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with > generation 3, with the assignment (ownedPartition) in generation 1. > # So, now, we have out-of-date ownedPartition sent, with unexpected results > happened > # *After the generation-3 rebalance, consumer A got P3/P4 partition. the > ownedPartition is ignored because of old generation.* > # *consumer A revoke P1/P2 and re-join to start a new round of rebalance* > # *if some other consumer C failed to syncGroup before consumer A's > joinGroup. the same issue will happens again and result in many rounds of > rebalance before stable* > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive
[ https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698586#comment-17698586 ] Kirk True commented on KAFKA-13891: --- [~pnee] please let me know if you need help on this. Happy to help where I can. > sync group failed with rebalanceInProgress error cause rebalance many rounds > in coopeartive > --- > > Key: KAFKA-13891 > URL: https://issues.apache.org/jira/browse/KAFKA-13891 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Shawn Wang >Assignee: Philip Nee >Priority: Major > Fix For: 3.5.0 > > > This issue was first found in > [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419] > But the previous PR forgot to reset generation when sync group failed with > rebalanceInProgress error. So the previous bug still exists and it may cause > consumer to rebalance many rounds before final stable. > Here's the example ({*}bold is added{*}): > # consumer A joined and synced group successfully with generation 1 *( with > ownedPartition P1/P2 )* > # New rebalance started with generation 2, consumer A joined successfully, > but somehow, consumer A doesn't send out sync group immediately > # other consumer completed sync group successfully in generation 2, except > consumer A. > # After consumer A send out sync group, the new rebalance start, with > generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group > response > # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with > generation 3, with the assignment (ownedPartition) in generation 1. > # So, now, we have out-of-date ownedPartition sent, with unexpected results > happened > # *After the generation-3 rebalance, consumer A got P3/P4 partition. the > ownedPartition is ignored because of old generation.* > # *consumer A revoke P1/P2 and re-join to start a new round of rebalance* > # *if some other consumer C failed to syncGroup before consumer A's > joinGroup. the same issue will happens again and result in many rounds of > rebalance before stable* > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on a diff in pull request #13337: MINOR: Refactor the MetadataPublisher interface
mumrah commented on code in PR #13337: URL: https://github.com/apache/kafka/pull/13337#discussion_r1131462269 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java: ## @@ -449,6 +450,15 @@ SnapshotManifest loadSnapshot( public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) { eventQueue.append(() -> { currentLeaderAndEpoch = leaderAndEpoch; +for (MetadataPublisher publisher : publishers.values()) { +try { +publisher.onControllerChange(currentLeaderAndEpoch); +} catch (Throwable e) { +faultHandler.handleFault("Unhandled error publishing the new leader " + +"change to " + currentLeaderAndEpoch + " with publisher " + +publisher.name(), e); Review Comment: nit; tabs ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -223,16 +223,21 @@ public String name() { } @Override -public void publishSnapshot(MetadataDelta delta, MetadataImage newImage, SnapshotManifest manifest) { -enqueueMetadataChangeEvent(delta, newImage, manifest.provenance(), true, NO_OP_HANDLER); +public void onControllerChange(LeaderAndEpoch newLeaderAndEpoch) { +eventQueue.append(new KRaftLeaderEvent(newLeaderAndEpoch)); } @Override -public void publishLogDelta(MetadataDelta delta, MetadataImage newImage, LogDeltaManifest manifest) { -if (!leaderAndEpoch.equals(manifest.leaderAndEpoch())) { -eventQueue.append(new KRaftLeaderEvent(manifest.leaderAndEpoch())); -} -enqueueMetadataChangeEvent(delta, newImage, manifest.provenance(), false, NO_OP_HANDLER); +public void onMetadataUpdate( +MetadataDelta delta, +MetadataImage newImage, +LoaderManifest manifest +) { +enqueueMetadataChangeEvent(delta, +newImage, Review Comment: nit: tabs ## metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java: ## @@ -200,7 +201,24 @@ void resetSnapshotCounters() { } @Override -public void publishSnapshot( +public void onMetadataUpdate( +MetadataDelta delta, +MetadataImage newImage, +LoaderManifest manifest +) { +switch (manifest.type()) { +case LOG_DELTA: +publishLogDelta(delta, newImage, (LogDeltaManifest) manifest); +break; +case SNAPSHOT: +publishSnapshot(delta, newImage, (SnapshotManifest) manifest); +break; +default: +break; Review Comment: wdyt about throwing an IllegalStateException in the `default`. I do that sometimes to future-proof against unhandled enums down the road. ## metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifest.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader; + +import org.apache.kafka.image.MetadataProvenance; + + +/** + * Contains information about what was loaded. + */ +public interface LoaderManifest { Review Comment: Ok, fair enough. Works for me 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow
[ https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698128#comment-17698128 ] Greg Harris edited comment on KAFKA-14666 at 3/9/23 6:29 PM: - These solutions must: 1. Provide correct translation of offsets (checkpoints can _never_ be ahead of the ideal checkpoint) 2. Provide best-effort translation of offsets (checkpoints can _sometimes_ be behind the ideal checkpoint) 3. Ensure that checkpoints are monotonic (if the upstream group does not rewind offsets, then later checkpoints must always be for higher offsets than earlier checkpoints) 4. Provide translation of offsets prior to the latest offset sync (earlier non-compacted offset syncs can influence translation) 5. Prioritize accuracy for consumer groups near the end of the replicated topic (the difference between a checkpoint and the ideal checkpoint for earlier offsets may be larger than later offsets) I explored some of the solution space, I think that there are some different designs that might be usable to solve this use-case, of varying viability: 1. Store fine-grained offset syncs for some bounded window (store every offset sync within max.offset.lag of the latest one) 2. Store coarse-grained offset syncs for an unbounded window (store the first offset after each power of 2 offset, biased towards the end of the topic) 3. Store fine-grained offset syncs for an unbounded window (keep the whole offset syncs topic in-memory) 4. Re-read the complete offset syncs topic periodically and use the offset sync which most closely precedes the consumer group offset. 5. Maintain an in-memory index from source group-topic-partition to offset-sync-topic offsets, and seek a single consumer over the offset-sync topic to lookup the relevant offset sync information. (like having 1 offset sync consumer per consumer group partition, but multiplexed into one physical consumer per checkpoint task) Solutions 1, 2, and 3 only read the offset syncs topic once on startup, at the cost of some additional memory to store the some or all of the offset syncs topic. Solutions 4 and 5 re-read the topic to avoid storing additional topic contents, with either no or minimal in-memory storage. Solution 5 would re-read the topic on startup and after upstream consumer group rewinds, and read the topic once for each group-topic-partition in steady-state. Solutions 1, 2, and 3 are less significant architecture changes. Solutions 4 and 5 are significant reimplementations of the translation logic, which would need to stop storing offset syncs topic contents and start storing state per translated group-topic-partition. Solutions 3, 4, and 5 which offer "perfect" translation do not have a significant advantage relative to the lossy solutions 1 and 2. As the offset-syncs is compacted, we can at most provide best-effort translation. Solution 1 does not provide translation of arbitrarily old offsets, but may be configured to allow users to configure the translatable window to one which is acceptable for the live workload. Solutions 2, 3, 4, and 5 allow translation for arbitrarily old offsets, where the order of compaction may affect monotonicity guarantees. These solutions would need to read their own checkpoints topic to determine if the translated offset would be monotonic. None of these options appears clearly better than the others, and will need further discussion and evaluation. was (Author: gharris1727): These solutions must: 1. Provide correct translation of offsets (checkpoints can _never_ be ahead of the ideal checkpoint) 2. Provide best-effort translation of offsets (checkpoints can _sometimes_ be behind than the ideal checkpoint) 3. Ensure that checkpoints are monotonic (if the upstream group does not rewind offsets, then later checkpoints must always be for higher offsets than earlier checkpoints) 4. Provide translation of offsets prior to the latest offset sync (earlier non-compacted offset syncs can influence translation) 5. Prioritize accuracy for consumer groups near the end of the replicated topic (the difference between a checkpoint and the ideal checkpoint for earlier offsets may be larger than later offsets) I explored some of the solution space, I think that there are some different designs that might be usable to solve this use-case, of varying viability: 1. Store fine-grained offset syncs for some bounded window (store every offset sync within max.offset.lag of the latest one) 2. Store coarse-grained offset syncs for an unbounded window (store the first offset after each power of 2 offset, biased towards the end of the topic) 3. Store fine-grained offset syncs for an unbounded window (keep the whole offset syncs topic in-memory) 4. Re-read the complete offset syncs topic periodically and emit use the offset sync which most closely precedes the consumer group offset.
[GitHub] [kafka] cmccabe merged pull request #13370: MINOR: Add unclean field of PartitionReassignmentRevert to hashCode equals and toString
cmccabe merged PR #13370: URL: https://github.com/apache/kafka/pull/13370 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13337: MINOR: Refactor the MetadataPublisher interface
cmccabe commented on PR #13337: URL: https://github.com/apache/kafka/pull/13337#issuecomment-1462530266 I fixed the case where I was invoking `DynamicConfigPublisher.publish` with a null. This was a bit silly... I was just being lazy about adding an extra function. I also fixed it so that `BrokerMetadataPublisher` is using `DynamicClientQuotaPublisher` and `ScramPublisher`. Ultimately this code should be the same on the broker and controller, just invoked slightly differently. (I have a follow-up change which will use Loader on the broker too, which will simplify this more.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13337: MINOR: Refactor the MetadataPublisher interface
cmccabe commented on code in PR #13337: URL: https://github.com/apache/kafka/pull/13337#discussion_r1131402040 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -213,7 +213,7 @@ class BrokerMetadataPublisher( } // Apply configuration deltas. - dynamicConfigPublisher.publish(delta, newImage) + dynamicConfigPublisher.publish(delta, newImage, null) Review Comment: fixed ## core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala: ## @@ -31,10 +32,16 @@ class DynamicConfigPublisher( faultHandler: FaultHandler, dynamicConfigHandlers: Map[String, ConfigHandler], nodeType: String -) extends Logging { +) extends org.apache.kafka.image.publisher.MetadataPublisher with Logging { logIdent = s"[DynamicConfigPublisher nodeType=${nodeType} id=${conf.nodeId}] " - def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = { + def name(): String = "DynamicConfigPublisher" + + def publish( +delta: MetadataDelta, +newImage: MetadataImage, +manifest: LoaderManifest Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #13372: MINOR: Improved error handling in ZK migration
mumrah opened a new pull request, #13372: URL: https://github.com/apache/kafka/pull/13372 This patch fixes many small issues to improve error handling and logging during the ZK migration. A test was added to simulate a ZK session expiration to ensure the correctness of the migration driver. With this change, ZK errors thrown during the migration will not hit the fault handler registered with with KRaftMigrationDriver, but they will be logged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.
CalvinConfluent commented on PR #13323: URL: https://github.com/apache/kafka/pull/13323#issuecomment-1462489865 @dajac Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
mimaison commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1131358290 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class EnvVarConfigProvider implements ConfigProvider { +private final Map envVarMap; + +public EnvVarConfigProvider() { +envVarMap = getEnvVars(); +} + +public EnvVarConfigProvider(Map envVarsAsArgument) { +envVarMap = envVarsAsArgument; +} + +private static final Logger log = LoggerFactory.getLogger(EnvVarConfigProvider.class); + +@Override +public void configure(Map configs) { +} Review Comment: You can make a similar point about the other ConfigProvider implementations. For example FileConfigProvider can be used to read arbitrary files. It would be good to have a KIP to allow restricting the providers, for example bound FileConfigProvider to a directory, or force a prefix for environment variables. Since providers are not enabled by default, I don't think this is required to merge this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.
dajac commented on PR #13323: URL: https://github.com/apache/kafka/pull/13323#issuecomment-1462452965 @CalvinConfluent Could you rebase the PR? There are a few conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #13371: MINOR: Updating video links to ASF YouTube
bbejeck commented on PR #13371: URL: https://github.com/apache/kafka/pull/13371#issuecomment-1462430869 Merged #13371 into trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #13371: MINOR: Updating video links to ASF YouTube
bbejeck merged PR #13371: URL: https://github.com/apache/kafka/pull/13371 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-6301) Incorrect Java Regex example '*' for mirroring all topics
[ https://issues.apache.org/jira/browse/KAFKA-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Waleed Fateem resolved KAFKA-6301. -- Resolution: Won't Fix Really old and irrelevant at this point. > Incorrect Java Regex example '*' for mirroring all topics > - > > Key: KAFKA-6301 > URL: https://issues.apache.org/jira/browse/KAFKA-6301 > Project: Kafka > Issue Type: Bug > Components: documentation >Affects Versions: 0.10.2.0, 0.11.0.0, 1.0.0 >Reporter: Waleed Fateem >Assignee: Waleed Fateem >Priority: Minor > Labels: documentation, mirror-maker > > The documentation for section "Mirroring data between clusters" states the > following: > Or you could mirror all topics using --whitelist '*' > The regular expression should be '.*' instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bbejeck commented on pull request #13371: MINOR: Updating video links to ASF YouTube
bbejeck commented on PR #13371: URL: https://github.com/apache/kafka/pull/13371#issuecomment-1462387972 ping one of @mimaison , @mjsax , @cadonna for review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck opened a new pull request, #13371: MINOR: Updating video links to ASF YouTube
bbejeck opened a new pull request, #13371: URL: https://github.com/apache/kafka/pull/13371 Mirror PR for https://github.com/apache/kafka-site/pull/495 in site docs *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Schm1tz1 commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
Schm1tz1 commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1131302858 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -41,6 +46,14 @@ public EnvVarConfigProvider(Map envVarsAsArgument) { @Override public void configure(Map configs) { +if (configs.keySet().contains(ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG)) { Review Comment: Done, thanks for the hint! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Schm1tz1 commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
Schm1tz1 commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1131296480 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderConfig.java: ## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.AbstractConfig; Review Comment: Resolved as I merged it into the main class according to the suggestion of @mimaison -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Schm1tz1 commented on pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
Schm1tz1 commented on PR #12992: URL: https://github.com/apache/kafka/pull/12992#issuecomment-1462373253 > > > Thanks for the PR! I left a suggestion > > > > > > Yes, concerning the separate class I was thinking the same to be honest, also for the static strings for the properties/docs definitions. I don't see a reference implementation for other providers as they are not configurable but happy to hear any suggestions. > > Unfortunately we don't have a mechanism for documenting this type of plugins. I think it's fine to keep the doc as an unused constant or simply as a comment for now. Ok, merged it into the provider class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1131241761 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } +public short version() { +return version; +} + public static class Builder { OffsetCommitResponseData data = new OffsetCommitResponseData(); HashMap byTopicName = new HashMap<>(); +private final TopicResolver topicResolver; +private final short version; + +public Builder(TopicResolver topicResolver, short version) { Review Comment: Sounds good to me. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14799) Source tasks fail if connector attempts to abort empty transaction
[ https://issues.apache.org/jira/browse/KAFKA-14799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698469#comment-17698469 ] Chris Egerton commented on KAFKA-14799: --- Right now I'm leaning toward option 1, probably with a {{{}WARN{}}}-level log message. > Source tasks fail if connector attempts to abort empty transaction > -- > > Key: KAFKA-14799 > URL: https://issues.apache.org/jira/browse/KAFKA-14799 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > If a source task invokes > [TransactionContext::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/TransactionContext.html#abortTransaction()] > while the current transaction is empty, and then returns an empty batch of > records from the next (or current) invocation of {{{}SourceTask::poll{}}}, > the task will fail. > This is because the Connect framework will honor the transaction abort > request by invoking > [KafkaProducer::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction()], > but without having first invoked > [KafkaProducer::beginTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#beginTransaction()] > (since no records had been received from the task), which leads to an > {{{}IllegalStateException{}}}. > An example stack trace for this scenario: > {quote}[2023-03-09 10:41:25,053] ERROR [exactlyOnceQuestionMark|task-0] > ExactlyOnceWorkerSourceTask\{id=exactlyOnceQuestionMark-0} Task threw an > uncaught and unrecoverable exception. Task is being killed and will not > recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:210) > java.lang.IllegalStateException: TransactionalId > exactly-once-source-integration-test-exactlyOnceQuestionMark-0: Invalid > transition attempted from state READY to state ABORTING_TRANSACTION > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:967) > at > org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginAbort$3(TransactionManager.java:269) > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266) > at > org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835) > at > org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.abortTransaction(ExactlyOnceWorkerSourceTask.java:495) > at > org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:473) > at > org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.maybeCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:398) > at > org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.batchDispatched(ExactlyOnceWorkerSourceTask.java:186) > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:362) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75) > at > org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > {quote} > > As far as a fix goes, we have a few options: > # Gracefully handle this case by translating the call to > {{TransactionContext::abortTransaction}} into a no-op > # Throw an exception (probably an {{{}IllegalStateException{}}}) from > {{{}TransactionContext::abortTransaction{}}}, which may fail the task, but > would give it the option to swallow the exception and continue processing if > it would like > # Forcibly fail the task without giving it the chance to swallow an > exception, using a similar strategy to how we fail tasks tha
[jira] [Created] (KAFKA-14799) Source tasks fail if connector attempts to abort empty transaction
Chris Egerton created KAFKA-14799: - Summary: Source tasks fail if connector attempts to abort empty transaction Key: KAFKA-14799 URL: https://issues.apache.org/jira/browse/KAFKA-14799 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Chris Egerton Assignee: Chris Egerton If a source task invokes [TransactionContext::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/TransactionContext.html#abortTransaction()] while the current transaction is empty, and then returns an empty batch of records from the next (or current) invocation of {{{}SourceTask::poll{}}}, the task will fail. This is because the Connect framework will honor the transaction abort request by invoking [KafkaProducer::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction()], but without having first invoked [KafkaProducer::beginTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#beginTransaction()] (since no records had been received from the task), which leads to an {{{}IllegalStateException{}}}. An example stack trace for this scenario: {quote}[2023-03-09 10:41:25,053] ERROR [exactlyOnceQuestionMark|task-0] ExactlyOnceWorkerSourceTask\{id=exactlyOnceQuestionMark-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:210) java.lang.IllegalStateException: TransactionalId exactly-once-source-integration-test-exactlyOnceQuestionMark-0: Invalid transition attempted from state READY to state ABORTING_TRANSACTION at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:967) at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginAbort$3(TransactionManager.java:269) at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116) at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266) at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835) at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.abortTransaction(ExactlyOnceWorkerSourceTask.java:495) at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:473) at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.maybeCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:398) at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.batchDispatched(ExactlyOnceWorkerSourceTask.java:186) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:362) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) {quote} As far as a fix goes, we have a few options: # Gracefully handle this case by translating the call to {{TransactionContext::abortTransaction}} into a no-op # Throw an exception (probably an {{{}IllegalStateException{}}}) from {{{}TransactionContext::abortTransaction{}}}, which may fail the task, but would give it the option to swallow the exception and continue processing if it would like # Forcibly fail the task without giving it the chance to swallow an exception, using a similar strategy to how we fail tasks that request that a transaction be committed and aborted for the same record (see [here|https://github.com/apache/kafka/blob/c5240c0390892fe9ecbe5285185c370e7be8b2aa/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java#L78-L86]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] andymg3 opened a new pull request, #13370: MINOR: Add unclean field of PartitionReassignmentRevert to hashCode Equals and toString
andymg3 opened a new pull request, #13370: URL: https://github.com/apache/kafka/pull/13370 ### Details It looks like we forgot to add the `unclean` field of `PartitionReassignmentRevert` to a few methods. So adding it here. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1131139080 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } +public short version() { +return version; +} + public static class Builder { OffsetCommitResponseData data = new OffsetCommitResponseData(); HashMap byTopicName = new HashMap<>(); +private final TopicResolver topicResolver; +private final short version; + +public Builder(TopicResolver topicResolver, short version) { Review Comment: Sounds good. Thanks for your guidance. As you mentioned, this PR is already quite large, so if you agree, I will go ahead and implement this change first, in a PR of its own. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
mimaison commented on PR #12992: URL: https://github.com/apache/kafka/pull/12992#issuecomment-1462175567 > > Thanks for the PR! I left a suggestion > > Yes, concerning the separate class I was thinking the same to be honest, also for the static strings for the properties/docs definitions. I don't see a reference implementation for other providers as they are not configurable but happy to hear any suggestions. Unfortunately we don't have a mechanism for documenting this type of plugins. I think it's fine to keep the doc as an unused constant or simply as a comment for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1131114261 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } +public short version() { +return version; +} + public static class Builder { OffsetCommitResponseData data = new OffsetCommitResponseData(); HashMap byTopicName = new HashMap<>(); +private final TopicResolver topicResolver; +private final short version; + +public Builder(TopicResolver topicResolver, short version) { Review Comment: How about doing the following? We change the signature of `GroupCoordinator.handleCommitOffsets` to the following: ``` def handleCommitOffsets(groupId: String, memberId: String, groupInstanceId: Option[String], generationId: Int, offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { ``` Note the change from `TopicPartition` to `TopicIdPartition` for `offsetMetadata` and `responseCallback`. Then, we have to adapt the implementation of `handleCommitOffsets` to get the `TopicPartition` from the `TopicIdPartition` where required. We can keep `pendingOffsetCommits` and `offsets` keyed by `TopicPartition` for now in `GroupMetadataManager`. This allows the preservation of the topic ids provided to the GroupCoordinator but it does not provide any stronger guarantee for the offsets yet (as you pointed out). With this approach, we don't depend on the resolver at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1131047710 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } +public short version() { +return version; +} + public static class Builder { OffsetCommitResponseData data = new OffsetCommitResponseData(); HashMap byTopicName = new HashMap<>(); +private final TopicResolver topicResolver; +private final short version; + +public Builder(TopicResolver topicResolver, short version) { Review Comment: Thanks for the answer. If I understand correctly, we would then have a resolution of topic ids from topic-name-based persisted data, so this may not prevent offsets from a topic to be provided as those of another topic with the same name (defined at different point in time in the server)? The resolution can be done in the group coordinator layer, assuming it has access to the topic id resolved upstream by the request handler. Because we want to preserve the same mapping used when request processing started, we need to ensure the right ids are used within the adaptor's `GroupCoordinator#commitOffsets` method(). Since the mapping returned from the metadata cache depends on the snapshot used at the time the mapping is requested, if the adaptor retrieves it from the metadata cache internally, at a different time from the request handler, there is no guarantee the metadata is the same hence that the topic IDs registered with the broker are the same. This means that the topic ids need to be propagated from the request handler (`KafkaApis`) to the coordinator adaptor somehow. Without a change in the method and contract implemented by the coordinator, these ids could be transferred via the `OffsetCommitRequestData` DTO directly, which means a change in the API schema would be required prior to the change. Alternatively, we may want to change the interface of the coordinator and change the signature of the offset commit method to allow for the propagation of topic ids. I may be missing the entire thing though? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1131047710 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } +public short version() { +return version; +} + public static class Builder { OffsetCommitResponseData data = new OffsetCommitResponseData(); HashMap byTopicName = new HashMap<>(); +private final TopicResolver topicResolver; +private final short version; + +public Builder(TopicResolver topicResolver, short version) { Review Comment: Thanks for the answer. If I understand correctly, we would then have a resolution of topic ids from topic-name-based persisted data, so this may not prevent offsets from a topic to be provided as those of another topic with the same name (defined at different point in time in the server)? The resolution can be done in the group coordinator layer, assuming it has access to the topic id resolved upstream by the request handler. Because we want to preserve the same mapping used when request processing started, we need to ensure the right ids are used within the adaptor's `GroupCoordinator#commitOffsets` method(). Since the mapping returned from the metadata cache depends on the snapshot used at the time the mapping is requested, if the adaptor retrieves it from the metadata cache internally, there is no guarantee the metadata is the same hence that the topic IDs registered with the broker are the same. This means that the topic ids need to be propagated from the request handler (`KafkaApis`) to the coordinator adaptor somehow. Without a change in the method and contract implemented by the coordinator, these ids could be transferred via the `OffsetCommitRequestData` DTO directly, which means a change in the API schema would be required prior to the change. Alternatively, we may want to change the interface of the coordinator and change the signature of the offset commit method to allow for the propagation of topic ids. I may be missing the entire thing though? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1131047710 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } +public short version() { +return version; +} + public static class Builder { OffsetCommitResponseData data = new OffsetCommitResponseData(); HashMap byTopicName = new HashMap<>(); +private final TopicResolver topicResolver; +private final short version; + +public Builder(TopicResolver topicResolver, short version) { Review Comment: Thanks for the answer. If I understand correctly, we would then have a resolution of topic ids from topic-name-based data, so this may not prevent offsets from a topic to be provided as those of another topic with the same name (defined at different point in time in the server)? The resolution can be done in the group coordinator layer, assuming it has access to the topic id resolved upstream by the request handler. Because we want to preserve the same mapping used when request processing started, we need to ensure the right ids are used within the adaptor's `GroupCoordinator#commitOffsets` method(). Since the mapping returned from the metadata cache depends on the snapshot used at the time the mapping is requested, if the adaptor retrieves it from the metadata cache internally, there is no guarantee the metadata is the same hence that the topic IDs registered with the broker are the same. This means that the topic ids need to be propagated from the request handler (`KafkaApis`) to the coordinator adaptor somehow. Without a change in the method and contract implemented by the coordinator, these ids could be transferred via the `OffsetCommitRequestData` DTO directly, which means a change in the API schema would be required prior to the change. Alternatively, we may want to change the interface of the coordinator and change the signature of the offset commit method to allow for the propagation of topic ids. I may be missing the entire thing though? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException
urbandan commented on code in PR #11565: URL: https://github.com/apache/kafka/pull/11565#discussion_r1130991899 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBackingStore.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public abstract class KafkaTopicBackingStore { +private static final Logger log = LoggerFactory.getLogger(KafkaTopicBackingStore.class); + +Consumer topicInitializer(String topic, NewTopic topicDescription, WorkerConfig config, Time time) { +return admin -> { +log.debug("Creating admin client to manage Connect internal {} topic", getTopicPurpose()); Review Comment: nit: I know this was copied over from the old method, but this doesn't seem accurate, can you please fix it? ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBackingStore.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public abstract class KafkaTopicBackingStore { Review Comment: nit: KafkaTopicStore or KafkaTopicBackedStore would be more descriptive ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -328,6 +331,41 @@ public Set createTopics(NewTopic... topics) { return createOrFindTopics(topics).createdTopics(); } +/** + * Implements a retry logic around creating topic(s) in case it'd fail due to InvalidReplicationFactorException + * + * @param topicDescription + * @param timeoutMs + * @param backOffMs + * @param time + * @return the same as {@link TopicAdmin#createTopics(NewTopic...)} + */ +public Set createTopicsWithRetry(NewTopic topicDescription, long timeoutMs, long backOffMs, Time time) { +Timer timer = time.timer(timeoutMs); +do { +try { +return createTopics(topicDescription); +} catch (ConnectException e) { +if (timer.notExpired() && retryableTopicCreationException(e)) { +
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1130942769 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } +public short version() { +return version; +} + public static class Builder { OffsetCommitResponseData data = new OffsetCommitResponseData(); HashMap byTopicName = new HashMap<>(); +private final TopicResolver topicResolver; +private final short version; + +public Builder(TopicResolver topicResolver, short version) { Review Comment: The OffsetCommitValue part is not possible at the moment because we don’t have a way to downgrade. My colleague @jeffkbkim works on a proposal for this. We could start by either migrating from using TopicPartition to using TopicIdPartition or handling this in the GroupCoordinatorAdaptor layer. The former is likely simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Schm1tz1 commented on pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
Schm1tz1 commented on PR #12992: URL: https://github.com/apache/kafka/pull/12992#issuecomment-1461890141 > Thanks for the PR! I left a suggestion Yes, I was thinking thew same to be honest, also for the static strings for the properties/docs definitions. I don't see a reference implementation for other providers as they are not configurable but happy to hear any suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698327#comment-17698327 ] Sagar Rao commented on KAFKA-13295: --- Had a discussion with [~guozhang] and the suggestion is to wait for 4.0 release when EOS-v1 would be deprecated. It would simpler to implement the changes in EOS-v2. > Long restoration times for new tasks can lead to transaction timeouts > - > > Key: KAFKA-13295 > URL: https://issues.apache.org/jira/browse/KAFKA-13295 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Critical > Labels: eos, new-streams-runtime-should-fix > Fix For: 4.0.0 > > > In some EOS applications with relatively long restoration times we've noticed > a series of ProducerFencedExceptions occurring during/immediately after > restoration. The broker logs were able to confirm these were due to > transactions timing out. > In Streams, it turns out we automatically begin a new txn when calling > {{send}} (if there isn’t already one in flight). A {{send}} occurs often > outside a commit during active processing (eg writing to the changelog), > leaving the txn open until the next commit. And if a StreamThread has been > actively processing when a rebalance results in a new stateful task without > revoking any existing tasks, the thread won’t actually commit this open txn > before it goes back into the restoration phase while it builds up state for > the new task. So the in-flight transaction is left open during restoration, > during which the StreamThread only consumes from the changelog without > committing, leaving it vulnerable to timing out when restoration times exceed > the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-13295: -- Fix Version/s: 4.0.0 (was: 3.5.0) > Long restoration times for new tasks can lead to transaction timeouts > - > > Key: KAFKA-13295 > URL: https://issues.apache.org/jira/browse/KAFKA-13295 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Critical > Labels: eos, new-streams-runtime-should-fix > Fix For: 4.0.0 > > > In some EOS applications with relatively long restoration times we've noticed > a series of ProducerFencedExceptions occurring during/immediately after > restoration. The broker logs were able to confirm these were due to > transactions timing out. > In Streams, it turns out we automatically begin a new txn when calling > {{send}} (if there isn’t already one in flight). A {{send}} occurs often > outside a commit during active processing (eg writing to the changelog), > leaving the txn open until the next commit. And if a StreamThread has been > actively processing when a rebalance results in a new stateful task without > revoking any existing tasks, the thread won’t actually commit this open txn > before it goes back into the restoration phase while it builds up state for > the new task. So the in-flight transaction is left open during restoration, > during which the StreamThread only consumes from the changelog without > committing, leaving it vulnerable to timing out when restoration times exceed > the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14798) corrupted consumer offsets
[ https://issues.apache.org/jira/browse/KAFKA-14798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aravind updated KAFKA-14798: Description: Kafka version 3.2.3 The consumer offsets for some partitions in a kafka topic are not refreshing after a restart of consumer application which made negative lag accumulating and the messages in the partitions are not consumed by the application until the log-end offset equals to older consumer offset. {noformat} 2023-02-07 14:33:43.485 ERROR 15 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-dummy-1, groupId=dummy] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on invocation of onPartitionsAssigned for partitions [dummy-0, dummy-1, dummy-2] org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition dummy-0 could be determined 2023-02-07 14:33:43.487 ERROR 15 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exceptionjava.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.errors.TimeoutException's; no record information is available at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1604) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1212) ~[spring-kafka-2.7.9.jar!/:2.7.9] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[na:na] at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[na:na] at java.base/java.lang.Thread.run(Unknown Source) ~[na:na] Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition dummy-0 could be determined {noformat} Getting "no record information available" consumer exception as the FetchPosition is pointing to old committed offset. was: Kafka version 3.2.3 The consumer offsets for some partitions in a kafka topic are not refreshing after a restart of consumer application which made negative lag accumulating and the messages in the partitions are not consumed by the application until the log-end offset equals to older consumer offset. {noformat} 2023-02-07 14:33:43.485 ERROR 15 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-dummy-1, groupId=dummy] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on invocation of onPartitionsAssigned for partitions [dummy-0, dummy-1, dummy-2] org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition dummy-0 could be determined 2023-02-07 14:33:43.487 ERROR 15 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exceptionjava.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.errors.TimeoutException's; no record information is available at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1604) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1212) ~[spring-kafka-2.7.9.jar!/:2.7.9] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[na:na] at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[na:na] at java.base/java.lang.Thread.run(Unknown Source) ~[na:na] Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition dummy-0 could be determined {noformat} Getting "no record information available" consumer exception as the FetchPosition is pointing to old committed offset. > corrupted consumer offsets > -- > > Key: KAFKA-14798 > URL: https://issues.apache.org/jira/browse/KAFKA-14798 > Project: Kafka > Issue Typ
[jira] [Updated] (KAFKA-14798) corrupted consumer offsets
[ https://issues.apache.org/jira/browse/KAFKA-14798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aravind updated KAFKA-14798: Description: Kafka version 3.2.3 The consumer offsets for some partitions in a kafka topic are not refreshing after a restart of consumer application which made negative lag accumulating and the messages in the partitions are not consumed by the application until the log-end offset equals to older consumer offset. {noformat} 2023-02-07 14:33:43.485 ERROR 15 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-dummy-1, groupId=dummy] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on invocation of onPartitionsAssigned for partitions [dummy-0, dummy-1, dummy-2] org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition dummy-0 could be determined 2023-02-07 14:33:43.487 ERROR 15 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exceptionjava.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.errors.TimeoutException's; no record information is available at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1604) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1212) ~[spring-kafka-2.7.9.jar!/:2.7.9] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[na:na] at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[na:na] at java.base/java.lang.Thread.run(Unknown Source) ~[na:na] Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition dummy-0 could be determined {noformat} Getting "no record information available" consumer exception as the FetchPosition is pointing to old committed offset. was: Kafka version 3.2.3 The consumer offsets for some partitions in a kafka topic are not refreshing after a restart of consumer application which made negative lag accumulating and the messages in the partitions are not consumed by the application until the log-end offset equals to older consumer offset. {noformat} 2023-02-07 14:33:43.485 ERROR 15 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-dummy-1, groupId=dummy] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on invocation of onPartitionsAssigned for partitions [dummy-0, dummy-1, dummy-2] org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition dummy-0 could be determined 2023-02-07 14:33:43.487 ERROR 15 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exceptionjava.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.errors.TimeoutException's; no record information is available at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1604) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1212) ~[spring-kafka-2.7.9.jar!/:2.7.9] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[na:na] at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[na:na] at java.base/java.lang.Thread.run(Unknown Source) ~[na:na] Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition dummy-0 could be determined {noformat} Getting "no record information available" consumer exception as the FetchPosition is pointing to old committed offset. > corrupted consumer offsets > -- > > Key: KAFKA-14798 > URL: https://issues.apache.org/jira/browse/KAFKA-14798 > P
[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1130866578 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } +public short version() { +return version; +} + public static class Builder { OffsetCommitResponseData data = new OffsetCommitResponseData(); HashMap byTopicName = new HashMap<>(); +private final TopicResolver topicResolver; +private final short version; + +public Builder(TopicResolver topicResolver, short version) { Review Comment: Hi David, thanks for the insight. I think you are right that implementing support of topic ids in the functional layer before exposing it in the API makes sense as it provides the guarantee that offsets and metadata belong to the partitions of the right topic in case of homonyms. Now, one question is how deep we go in the integration of ids in this layer. Would you consider changing the data model authored by the group coordinator down to the `OffsetCommitValue ` as prescribed by KIP 848? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14798) corrupted consumer offsets
Aravind created KAFKA-14798: --- Summary: corrupted consumer offsets Key: KAFKA-14798 URL: https://issues.apache.org/jira/browse/KAFKA-14798 Project: Kafka Issue Type: Bug Reporter: Aravind Kafka version 3.2.3 The consumer offsets for some partitions in a kafka topic are not refreshing after a restart of consumer application which made negative lag accumulating and the messages in the partitions are not consumed by the application until the log-end offset equals to older consumer offset. {noformat} 2023-02-07 14:33:43.485 ERROR 15 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-dummy-1, groupId=dummy] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on invocation of onPartitionsAssigned for partitions [dummy-0, dummy-1, dummy-2] org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition dummy-0 could be determined 2023-02-07 14:33:43.487 ERROR 15 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exceptionjava.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.errors.TimeoutException's; no record information is available at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1604) ~[spring-kafka-2.7.9.jar!/:2.7.9] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1212) ~[spring-kafka-2.7.9.jar!/:2.7.9] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[na:na] at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[na:na] at java.base/java.lang.Thread.run(Unknown Source) ~[na:na] Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition dummy-0 could be determined {noformat} Getting "no record information available" consumer exception as the FetchPosition is pointing to old committed offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
mimaison commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1130786389 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderConfig.java: ## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.AbstractConfig; Review Comment: I wonder if we really need a separate class for this single config. Could we move it to `EnvVarConfigProvider`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
dajac merged PR #12813: URL: https://github.com/apache/kafka/pull/12813 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1130671265 ## clients/src/main/java/org/apache/kafka/common/TopicIdAndNameBiMapping.java: ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import org.apache.kafka.common.errors.InvalidTopicException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Encapsulates the mapping between topic names and ids assuming a 1:1 relationship between + * a name and an id. + * + * Note that this class intends to be used for the (reverse) lookup of topic IDs/names, but + * not to characterize the set of topics which are known by a client. Use the + * {@link org.apache.kafka.clients.MetadataCache} for that purpose. + */ +public class TopicIdAndNameBiMapping { +private final Map topicIds; +private final Map topicNames; + +/** + * A mapping which universe of topic ids and names is captured from the input map. The reverse association + * between a topic ID and a topic name is computed by this method. If there are more than one topic name + * resolving to the same topic ID, an {@link InvalidTopicException} is thrown. + */ +public static TopicIdAndNameBiMapping fromTopicIds(Map topicIds) { Review Comment: Both names are fine for me. I leave it up to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1130670740 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } +public short version() { +return version; +} + public static class Builder { OffsetCommitResponseData data = new OffsetCommitResponseData(); HashMap byTopicName = new HashMap<>(); +private final TopicResolver topicResolver; +private final short version; + +public Builder(TopicResolver topicResolver, short version) { Review Comment: I actually wonder if we should do it the other way around. We could do KAFKA-14793 first, merge it, and update this one accordingly. Without KAFKA-14793, the contract of the not really respected and it feels a bit weird to work around it here instead of fixing the real issue. Is KAFKA-14793 complicated? What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org