[GitHub] [kafka] yashmayya opened a new pull request, #13375: KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode

2023-03-09 Thread via GitHub


yashmayya opened a new pull request, #13375:
URL: https://github.com/apache/kafka/pull/13375

   - The existing `EmbeddedConnectCluster` (used in Connect integration tests) 
uses a backing `EmbeddedKafkaCluster` which internally also spins up an 
`EmbeddedZookeeper`.
   - This patch migrates the `EmbeddedKafkaCluster` to run in KRaft mode, 
leveraging the existing 
[KafkaClusterTestKit](https://github.com/apache/kafka/blob/5f6a050bfee09b634497f9ba35e2964289be1e4d/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L80)
 from `core`.
   - Connect / MirrorMaker integration tests which setup a Kafka cluster with 
`SASL_PLAINTEXT` or  `SSL` listeners needed to be updated to take into account 
the controller listeners as well and also update the authorizer used (from 
`kafka.security.authorizer.AclAuthorizer` to 
`org.apache.kafka.metadata.authorizer.StandardAuthorizer`, see 
[KIP-801](https://cwiki.apache.org/confluence/display/KAFKA/KIP-801%3A+Implement+an+Authorizer+that+stores+metadata+in+__cluster_metadata))
 since the old authorizer relied on ZooKeeper.
   - The existing `EmbeddedKafkaCluster` had some logic to restart brokers and 
have them listening on the same ports as earlier (in order to verify Connect's 
functionality when its backing Kafka cluster goes down and then comes back up). 
This was refactored to move the responsibility of using a fixed port in the 
broker's listeners config to the tests themselves.
   - Some changes to `KafkaClusterTestKit` in order to allow externally 
configuring listeners configurations and other minor improvements.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-10807) AlterConfig should be validated by the target broker

2023-03-09 Thread Vinoth (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth reassigned KAFKA-10807:
--

Assignee: Vinoth

> AlterConfig should be validated by the target broker
> 
>
> Key: KAFKA-10807
> URL: https://issues.apache.org/jira/browse/KAFKA-10807
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Vinoth
>Priority: Major
>
> After forwarding is enabled, AlterConfigs will no longer be sent to the 
> target broker. This behavior bypasses some important config change 
> validations, such as path existence, static config conflict, or even worse 
> when the target broker is offline, the propagated result does not reflect a 
> true applied result. We should gather those necessary cases, and decide 
> whether to actually handle the AlterConfig request firstly on the target 
> broker, and then forward, in a validate-forward-apply path.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10657) Incorporate Envelope into auto-generated JSON schema

2023-03-09 Thread Vinoth (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth resolved KAFKA-10657.

Resolution: Resolved

Has been fixed in https://issues.apache.org/jira/browse/KAFKA-10525

> Incorporate Envelope into auto-generated JSON schema
> 
>
> Key: KAFKA-10657
> URL: https://issues.apache.org/jira/browse/KAFKA-10657
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Vinoth
>Priority: Major
>
> We need to add support to output JSON format for embed request inside 
> Envelope to do better request logging.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #10244: KAFKA-12399: Deprecate Log4J Appender

2023-03-09 Thread via GitHub


showuon commented on code in PR #10244:
URL: https://github.com/apache/kafka/pull/10244#discussion_r1131920152


##
config/tools-log4j2.properties:
##
@@ -12,14 +12,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+name=ToolsConfig
+status=OFF
 
-# Define the root logger with appender file
-log4j.rootLogger = {{ log_level|default("INFO") }}, FILE
+appenders=stderr
 
-log4j.appender.FILE=org.apache.log4j.FileAppender
-log4j.appender.FILE.File={{ log_file }}
-log4j.appender.FILE.ImmediateFlush=true
-# Set the append to false, overwrite
-log4j.appender.FILE.Append=false
-log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
-log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
+appender.stderr.type=Console
+appender.stderr.name=STDERR
+appender.stderr.layout.type=PatternLayout
+appender.stderr.layout.pattern=[%d] %p %m (%c)%n
+
+rootLogger.level=WARN

Review Comment:
   The original default is "INFO", why do we change to WARN?



##
docs/upgrade.html:
##
@@ -25,6 +25,195 @@ Notable changes in 3
 which meant that idempotence remained disabled unless the user had 
explicitly set enable.idempotence to true
 (See https://issues.apache.org/jira/browse/KAFKA-13598";>KAFKA-13598for 
more details).
 This issue was fixed and the default is properly applied in 3.0.1, 
3.1.1, and 3.2.0.
+
+The log4j 1.x based log4j-appender is now deprecated and no 
longer recommended. This artifact will be released for a while, but will be 
removed at the next major release.

Review Comment:
   If we decided to add this in v3.5, this upgrade doc should also move to 3.5.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14776) Update SCRAM system tests to run with KRaft

2023-03-09 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698719#comment-17698719
 ] 

Proven Provenzano commented on KAFKA-14776:
---

[PR #13374|https://github.com/apache/kafka/pull/13374]includes both KIP-900 
implementation and updated system tests.

> Update SCRAM system tests to run with KRaft
> ---
>
> Key: KAFKA-14776
> URL: https://issues.apache.org/jira/browse/KAFKA-14776
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
>
> I will update the SCRAM system tests to run under both ZK and KRaft quorum 
> mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-10657) Incorporate Envelope into auto-generated JSON schema

2023-03-09 Thread Vinoth (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth reassigned KAFKA-10657:
--

Assignee: Vinoth

> Incorporate Envelope into auto-generated JSON schema
> 
>
> Key: KAFKA-10657
> URL: https://issues.apache.org/jira/browse/KAFKA-10657
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Vinoth
>Priority: Major
>
> We need to add support to output JSON format for embed request inside 
> Envelope to do better request logging.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed

2023-03-09 Thread via GitHub


showuon commented on PR #13348:
URL: https://github.com/apache/kafka/pull/13348#issuecomment-1463202757

   The test code, I mean. We already have some unit tests for 
producerPerformance 
[here](https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java).
 Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano opened a new pull request, #13374: KRAFT-14765

2023-03-09 Thread via GitHub


pprovenzano opened a new pull request, #13374:
URL: https://github.com/apache/kafka/pull/13374

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed

2023-03-09 Thread via GitHub


hudeqi commented on PR #13348:
URL: https://github.com/apache/kafka/pull/13348#issuecomment-1463200864

   > @hudeqi , thanks for the patch. This change makes sense to me. Could you 
add tests for this change?
   
   Do you want to add test screenshots/results for this case, or the test code 
corresponding to this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed

2023-03-09 Thread via GitHub


hudeqi commented on PR #13348:
URL: https://github.com/apache/kafka/pull/13348#issuecomment-1463199545

   > >
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi closed pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed

2023-03-09 Thread via GitHub


hudeqi closed pull request #13348: MINOR: Fix ProducerPerformance still 
counting successful sending when sending failed
URL: https://github.com/apache/kafka/pull/13348


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed

2023-03-09 Thread via GitHub


hudeqi commented on PR #13348:
URL: https://github.com/apache/kafka/pull/13348#issuecomment-1463199231

   > 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge opened a new pull request, #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)

2023-03-09 Thread via GitHub


tinaselenge opened a new pull request, #13373:
URL: https://github.com/apache/kafka/pull/13373

   Implements 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-894%3A+Use+incrementalAlterConfigs+API+for+syncing+topic+configurations
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #13366: MINOR; Export control record type value

2023-03-09 Thread via GitHub


jsancio merged PR #13366:
URL: https://github.com/apache/kafka/pull/13366


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13366: MINOR; Export control record type value

2023-03-09 Thread via GitHub


jsancio commented on PR #13366:
URL: https://github.com/apache/kafka/pull/13366#issuecomment-1462954942

   Merging. Unrelated test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13337: MINOR: Refactor the MetadataPublisher interface

2023-03-09 Thread via GitHub


cmccabe merged PR #13337:
URL: https://github.com/apache/kafka/pull/13337


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13337: MINOR: Refactor the MetadataPublisher interface

2023-03-09 Thread via GitHub


cmccabe commented on code in PR #13337:
URL: https://github.com/apache/kafka/pull/13337#discussion_r1131705968


##
metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java:
##
@@ -200,7 +201,24 @@ void resetSnapshotCounters() {
 }
 
 @Override
-public void publishSnapshot(
+public void onMetadataUpdate(
+MetadataDelta delta,
+MetadataImage newImage,
+LoaderManifest manifest
+) {
+switch (manifest.type()) {
+case LOG_DELTA:
+publishLogDelta(delta, newImage, (LogDeltaManifest) manifest);
+break;
+case SNAPSHOT:
+publishSnapshot(delta, newImage, (SnapshotManifest) manifest);
+break;
+default:
+break;

Review Comment:
   Actually, let's just remove the `default` case. spotbugs will flag 
non-exhaustive switch statements on enums. And that's a hard build error for us.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #13337: MINOR: Refactor the MetadataPublisher interface

2023-03-09 Thread via GitHub


rondagostino commented on PR #13337:
URL: https://github.com/apache/kafka/pull/13337#issuecomment-1462835684

   Looks like compile failures in `BrokerMetadataPublisherTest.scala`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13337: MINOR: Refactor the MetadataPublisher interface

2023-03-09 Thread via GitHub


rondagostino commented on code in PR #13337:
URL: https://github.com/apache/kafka/pull/13337#discussion_r1131578840


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -213,32 +212,13 @@ class BrokerMetadataPublisher(
   }
 
   // Apply configuration deltas.
-  dynamicConfigPublisher.publish(delta, newImage)
+  dynamicConfigPublisher.onMetadataUpdate(delta, newImage)
 
   // Apply client quotas delta.
-  try {
-Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
-  clientQuotaMetadataManager.update(clientQuotasDelta)
-}
-  } catch {
-case t: Throwable => metadataPublishingFaultHandler.handleFault("Error 
updating client " +
-  s"quotas in $deltaName", t)
-  }
+  dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage)
 
-  // Apply changes to SCRAM credentials.
-  Option(delta.scramDelta()).foreach { scramDelta =>
-scramDelta.changes().forEach {
-  case (mechanism, userChanges) =>
-userChanges.forEach {
-  case (userName, change) =>
-if (change.isPresent) {
-  credentialProvider.updateCredential(mechanism, userName, 
change.get().toCredential(mechanism))
-} else {
-  credentialProvider.removeCredentials(mechanism, userName)
-}
-}
-}
-  }
+  // APply SCRAM delta.

Review Comment:
   nit: `s/AP/Ap/`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

2023-03-09 Thread via GitHub


kirktrue commented on PR #13301:
URL: https://github.com/apache/kafka/pull/13301#issuecomment-1462812863

   @guozhangwang I've added some dedicated unit tests. Keep in mind that these 
classes are all still used by the `Fetcher` and thus the extensive 
`FetcherTest` class covers them too. When running `FetcherTest` in code 
coverage mode, well over 95+% of the code is covered by the existing tests 
already.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2023-03-09 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698597#comment-17698597
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-2967:
-

[~tombentley] , to alleviate a bit the broken links issue, we could have some 
of the main headers in the index page to reproduce the anchors and have the 
link of each section below. Something like this:

!https://raw.githubusercontent.com/jeqo/ak-docs/main/index_redirects.png!

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>  Labels: documentation
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive

2023-03-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-13891:
-

Assignee: Philip Nee  (was: Kirk True)

> sync group failed with rebalanceInProgress error cause rebalance many rounds 
> in coopeartive
> ---
>
> Key: KAFKA-13891
> URL: https://issues.apache.org/jira/browse/KAFKA-13891
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0
>
>
> This issue was first found in 
> [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419]
> But the previous PR forgot to reset generation when sync group failed with 
> rebalanceInProgress error. So the previous bug still exists and it may cause 
> consumer to rebalance many rounds before final stable.
> Here's the example ({*}bold is added{*}):
>  # consumer A joined and synced group successfully with generation 1 *( with 
> ownedPartition P1/P2 )*
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  # *After the generation-3 rebalance, consumer A got P3/P4 partition. the 
> ownedPartition is ignored because of old generation.*
>  # *consumer A revoke P1/P2 and re-join to start a new round of rebalance*
>  # *if some other consumer C failed to syncGroup before consumer A's 
> joinGroup. the same issue will happens again and result in many rounds of 
> rebalance before stable*
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive

2023-03-09 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698586#comment-17698586
 ] 

Kirk True commented on KAFKA-13891:
---

[~pnee] please let me know if you need help on this. Happy to help where I can.

> sync group failed with rebalanceInProgress error cause rebalance many rounds 
> in coopeartive
> ---
>
> Key: KAFKA-13891
> URL: https://issues.apache.org/jira/browse/KAFKA-13891
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0
>
>
> This issue was first found in 
> [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419]
> But the previous PR forgot to reset generation when sync group failed with 
> rebalanceInProgress error. So the previous bug still exists and it may cause 
> consumer to rebalance many rounds before final stable.
> Here's the example ({*}bold is added{*}):
>  # consumer A joined and synced group successfully with generation 1 *( with 
> ownedPartition P1/P2 )*
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  # *After the generation-3 rebalance, consumer A got P3/P4 partition. the 
> ownedPartition is ignored because of old generation.*
>  # *consumer A revoke P1/P2 and re-join to start a new round of rebalance*
>  # *if some other consumer C failed to syncGroup before consumer A's 
> joinGroup. the same issue will happens again and result in many rounds of 
> rebalance before stable*
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #13337: MINOR: Refactor the MetadataPublisher interface

2023-03-09 Thread via GitHub


mumrah commented on code in PR #13337:
URL: https://github.com/apache/kafka/pull/13337#discussion_r1131462269


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##
@@ -449,6 +450,15 @@ SnapshotManifest loadSnapshot(
 public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
 eventQueue.append(() -> {
 currentLeaderAndEpoch = leaderAndEpoch;
+for (MetadataPublisher publisher : publishers.values()) {
+try {
+publisher.onControllerChange(currentLeaderAndEpoch);
+} catch (Throwable e) {
+faultHandler.handleFault("Unhandled error publishing the 
new leader " +
+"change to " + currentLeaderAndEpoch + " with 
publisher " +
+publisher.name(), e);

Review Comment:
   nit; tabs



##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -223,16 +223,21 @@ public String name() {
 }
 
 @Override
-public void publishSnapshot(MetadataDelta delta, MetadataImage newImage, 
SnapshotManifest manifest) {
-enqueueMetadataChangeEvent(delta, newImage, manifest.provenance(), 
true, NO_OP_HANDLER);
+public void onControllerChange(LeaderAndEpoch newLeaderAndEpoch) {
+eventQueue.append(new KRaftLeaderEvent(newLeaderAndEpoch));
 }
 
 @Override
-public void publishLogDelta(MetadataDelta delta, MetadataImage newImage, 
LogDeltaManifest manifest) {
-if (!leaderAndEpoch.equals(manifest.leaderAndEpoch())) {
-eventQueue.append(new KRaftLeaderEvent(manifest.leaderAndEpoch()));
-}
-enqueueMetadataChangeEvent(delta, newImage, manifest.provenance(), 
false, NO_OP_HANDLER);
+public void onMetadataUpdate(
+MetadataDelta delta,
+MetadataImage newImage,
+LoaderManifest manifest
+) {
+enqueueMetadataChangeEvent(delta,
+newImage,

Review Comment:
   nit: tabs



##
metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java:
##
@@ -200,7 +201,24 @@ void resetSnapshotCounters() {
 }
 
 @Override
-public void publishSnapshot(
+public void onMetadataUpdate(
+MetadataDelta delta,
+MetadataImage newImage,
+LoaderManifest manifest
+) {
+switch (manifest.type()) {
+case LOG_DELTA:
+publishLogDelta(delta, newImage, (LogDeltaManifest) manifest);
+break;
+case SNAPSHOT:
+publishSnapshot(delta, newImage, (SnapshotManifest) manifest);
+break;
+default:
+break;

Review Comment:
   wdyt about throwing an IllegalStateException in the `default`. I do that 
sometimes to future-proof against unhandled enums down the road. 



##
metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifest.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.image.MetadataProvenance;
+
+
+/**
+ * Contains information about what was loaded.
+ */
+public interface LoaderManifest {

Review Comment:
   Ok, fair enough. Works for me 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-03-09 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698128#comment-17698128
 ] 

Greg Harris edited comment on KAFKA-14666 at 3/9/23 6:29 PM:
-

These solutions must:
1. Provide correct translation of offsets (checkpoints can _never_ be ahead of 
the ideal checkpoint)
2. Provide best-effort translation of offsets (checkpoints can _sometimes_ be 
behind the ideal checkpoint)
3. Ensure that checkpoints are monotonic (if the upstream group does not rewind 
offsets, then later checkpoints must always be for higher offsets than earlier 
checkpoints)
4. Provide translation of offsets prior to the latest offset sync (earlier 
non-compacted offset syncs can influence translation)
5. Prioritize accuracy for consumer groups near the end of the replicated topic 
(the difference between a checkpoint and the ideal checkpoint for earlier 
offsets may be larger than later offsets)

I explored some of the solution space, I think that there are some different 
designs that might be usable to solve this use-case, of varying viability:
1. Store fine-grained offset syncs for some bounded window (store every offset 
sync within max.offset.lag of the latest one)
2. Store coarse-grained offset syncs for an unbounded window (store the first 
offset after each power of 2 offset, biased towards the end of the topic)
3. Store fine-grained offset syncs for an unbounded window (keep the whole 
offset syncs topic in-memory)
4. Re-read the complete offset syncs topic periodically and use the offset sync 
which most closely precedes the consumer group offset.
5. Maintain an in-memory index from source group-topic-partition to 
offset-sync-topic offsets, and seek a single consumer over the offset-sync 
topic to lookup the relevant offset sync information. (like having 1 offset 
sync consumer per consumer group partition, but multiplexed into one physical 
consumer per checkpoint task)

Solutions 1, 2, and 3 only read the offset syncs topic once on startup, at the 
cost of some additional memory to store the some or all of the offset syncs 
topic.
Solutions 4 and 5 re-read the topic to avoid storing additional topic contents, 
with either no or minimal in-memory storage.
Solution 5 would re-read the topic on startup and after upstream consumer group 
rewinds, and read the topic once for each group-topic-partition in steady-state.

Solutions 1, 2, and 3 are less significant architecture changes.
Solutions 4 and 5 are significant reimplementations of the translation logic, 
which would need to stop storing offset syncs topic contents and start storing 
state per translated group-topic-partition.

Solutions 3, 4, and 5 which offer "perfect" translation do not have a 
significant advantage relative to the lossy solutions 1 and 2. As the 
offset-syncs is compacted, we can at most provide best-effort translation.

Solution 1 does not provide translation of arbitrarily old offsets, but may be 
configured to allow users to configure the translatable window to one which is 
acceptable for the live workload.
Solutions 2, 3, 4, and 5 allow translation for arbitrarily old offsets, where 
the order of compaction may affect monotonicity guarantees. These solutions 
would need to read their own checkpoints topic to determine if the translated 
offset would be monotonic.

None of these options appears clearly better than the others, and will need 
further discussion and evaluation.


was (Author: gharris1727):
These solutions must:
1. Provide correct translation of offsets (checkpoints can _never_ be ahead of 
the ideal checkpoint)
2. Provide best-effort translation of offsets (checkpoints can _sometimes_ be 
behind than the ideal checkpoint)
3. Ensure that checkpoints are monotonic (if the upstream group does not rewind 
offsets, then later checkpoints must always be for higher offsets than earlier 
checkpoints)
4. Provide translation of offsets prior to the latest offset sync (earlier 
non-compacted offset syncs can influence translation)
5. Prioritize accuracy for consumer groups near the end of the replicated topic 
(the difference between a checkpoint and the ideal checkpoint for earlier 
offsets may be larger than later offsets)

I explored some of the solution space, I think that there are some different 
designs that might be usable to solve this use-case, of varying viability:
1. Store fine-grained offset syncs for some bounded window (store every offset 
sync within max.offset.lag of the latest one)
2. Store coarse-grained offset syncs for an unbounded window (store the first 
offset after each power of 2 offset, biased towards the end of the topic)
3. Store fine-grained offset syncs for an unbounded window (keep the whole 
offset syncs topic in-memory)
4. Re-read the complete offset syncs topic periodically and emit use the offset 
sync which most closely precedes the consumer group offset.

[GitHub] [kafka] cmccabe merged pull request #13370: MINOR: Add unclean field of PartitionReassignmentRevert to hashCode equals and toString

2023-03-09 Thread via GitHub


cmccabe merged PR #13370:
URL: https://github.com/apache/kafka/pull/13370


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13337: MINOR: Refactor the MetadataPublisher interface

2023-03-09 Thread via GitHub


cmccabe commented on PR #13337:
URL: https://github.com/apache/kafka/pull/13337#issuecomment-1462530266

   I fixed the case where I was invoking `DynamicConfigPublisher.publish` with 
a null. This was a bit silly... I was just being lazy about adding an extra 
function.
   
   I also fixed it so that `BrokerMetadataPublisher` is using 
`DynamicClientQuotaPublisher` and `ScramPublisher`. Ultimately this code should 
be the same on the broker and controller, just invoked slightly differently. (I 
have a follow-up change which will use Loader on the broker too, which will 
simplify this more.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13337: MINOR: Refactor the MetadataPublisher interface

2023-03-09 Thread via GitHub


cmccabe commented on code in PR #13337:
URL: https://github.com/apache/kafka/pull/13337#discussion_r1131402040


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -213,7 +213,7 @@ class BrokerMetadataPublisher(
   }
 
   // Apply configuration deltas.
-  dynamicConfigPublisher.publish(delta, newImage)
+  dynamicConfigPublisher.publish(delta, newImage, null)

Review Comment:
   fixed



##
core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala:
##
@@ -31,10 +32,16 @@ class DynamicConfigPublisher(
   faultHandler: FaultHandler,
   dynamicConfigHandlers: Map[String, ConfigHandler],
   nodeType: String
-) extends Logging {
+) extends org.apache.kafka.image.publisher.MetadataPublisher with Logging {
   logIdent = s"[DynamicConfigPublisher nodeType=${nodeType} id=${conf.nodeId}] 
"
 
-  def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+  def name(): String = "DynamicConfigPublisher"
+
+  def publish(
+delta: MetadataDelta,
+newImage: MetadataImage,
+manifest: LoaderManifest

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #13372: MINOR: Improved error handling in ZK migration

2023-03-09 Thread via GitHub


mumrah opened a new pull request, #13372:
URL: https://github.com/apache/kafka/pull/13372

   This patch fixes many small issues to improve error handling and logging 
during the ZK migration. A test was added to simulate a ZK session expiration 
to ensure the correctness of the migration driver.
   
   With this change, ZK errors thrown during the migration will not hit the 
fault handler registered with with KRaftMigrationDriver, but they will be 
logged. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.

2023-03-09 Thread via GitHub


CalvinConfluent commented on PR #13323:
URL: https://github.com/apache/kafka/pull/13323#issuecomment-1462489865

   @dajac Done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-03-09 Thread via GitHub


mimaison commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1131358290


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnvVarConfigProvider implements ConfigProvider {
+private final Map envVarMap;
+
+public EnvVarConfigProvider() {
+envVarMap = getEnvVars();
+}
+
+public EnvVarConfigProvider(Map envVarsAsArgument) {
+envVarMap = envVarsAsArgument;
+}
+
+private static final Logger log = 
LoggerFactory.getLogger(EnvVarConfigProvider.class);
+
+@Override
+public void configure(Map configs) {
+}

Review Comment:
   You can make a similar point about the other ConfigProvider implementations. 
For example FileConfigProvider can be used to read arbitrary files. 
   
   It would be good to have a KIP to allow restricting the providers, for 
example bound FileConfigProvider to a directory, or force a prefix for 
environment variables.
   
   Since providers are not enabled by default, I don't think this is required 
to merge this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.

2023-03-09 Thread via GitHub


dajac commented on PR #13323:
URL: https://github.com/apache/kafka/pull/13323#issuecomment-1462452965

   @CalvinConfluent Could you rebase the PR? There are a few conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #13371: MINOR: Updating video links to ASF YouTube

2023-03-09 Thread via GitHub


bbejeck commented on PR #13371:
URL: https://github.com/apache/kafka/pull/13371#issuecomment-1462430869

   Merged #13371 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck merged pull request #13371: MINOR: Updating video links to ASF YouTube

2023-03-09 Thread via GitHub


bbejeck merged PR #13371:
URL: https://github.com/apache/kafka/pull/13371


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-6301) Incorrect Java Regex example '*' for mirroring all topics

2023-03-09 Thread Waleed Fateem (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Waleed Fateem resolved KAFKA-6301.
--
Resolution: Won't Fix

Really old and irrelevant at this point.

> Incorrect Java Regex example '*' for mirroring all topics
> -
>
> Key: KAFKA-6301
> URL: https://issues.apache.org/jira/browse/KAFKA-6301
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 0.10.2.0, 0.11.0.0, 1.0.0
>Reporter: Waleed Fateem
>Assignee: Waleed Fateem
>Priority: Minor
>  Labels: documentation, mirror-maker
>
> The documentation for section "Mirroring data between clusters" states the 
> following:
> Or you could mirror all topics using --whitelist '*'
> The regular expression should be '.*' instead. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bbejeck commented on pull request #13371: MINOR: Updating video links to ASF YouTube

2023-03-09 Thread via GitHub


bbejeck commented on PR #13371:
URL: https://github.com/apache/kafka/pull/13371#issuecomment-1462387972

   ping one of @mimaison , @mjsax , @cadonna for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck opened a new pull request, #13371: MINOR: Updating video links to ASF YouTube

2023-03-09 Thread via GitHub


bbejeck opened a new pull request, #13371:
URL: https://github.com/apache/kafka/pull/13371

   Mirror PR for https://github.com/apache/kafka-site/pull/495 in site docs
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Schm1tz1 commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-03-09 Thread via GitHub


Schm1tz1 commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1131302858


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -41,6 +46,14 @@ public EnvVarConfigProvider(Map 
envVarsAsArgument) {
 
 @Override
 public void configure(Map configs) {
+if (configs.keySet().contains(ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG)) 
{

Review Comment:
   Done, thanks for the hint!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Schm1tz1 commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-03-09 Thread via GitHub


Schm1tz1 commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1131296480


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderConfig.java:
##
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.AbstractConfig;

Review Comment:
   Resolved as I merged it into the main class according to the suggestion of 
@mimaison 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Schm1tz1 commented on pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-03-09 Thread via GitHub


Schm1tz1 commented on PR #12992:
URL: https://github.com/apache/kafka/pull/12992#issuecomment-1462373253

   > > > Thanks for the PR! I left a suggestion
   > > 
   > > 
   > > Yes, concerning the separate class I was thinking the same to be honest, 
also for the static strings for the properties/docs definitions. I don't see a 
reference implementation for other providers as they are not configurable but 
happy to hear any suggestions.
   > 
   > Unfortunately we don't have a mechanism for documenting this type of 
plugins. I think it's fine to keep the doc as an unused constant or simply as a 
comment for now.
   
   Ok, merged it into the provider class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-09 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1131241761


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) {
 return version >= 4;
 }
 
+public short version() {
+return version;
+}
+
 public static class Builder {
 OffsetCommitResponseData data = new OffsetCommitResponseData();
 HashMap byTopicName = new 
HashMap<>();
+private final TopicResolver topicResolver;
+private final short version;
+
+public Builder(TopicResolver topicResolver, short version) {

Review Comment:
   Sounds good to me. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14799) Source tasks fail if connector attempts to abort empty transaction

2023-03-09 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698469#comment-17698469
 ] 

Chris Egerton commented on KAFKA-14799:
---

Right now I'm leaning toward option 1, probably with a {{{}WARN{}}}-level log 
message.

> Source tasks fail if connector attempts to abort empty transaction
> --
>
> Key: KAFKA-14799
> URL: https://issues.apache.org/jira/browse/KAFKA-14799
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> If a source task invokes 
> [TransactionContext::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/TransactionContext.html#abortTransaction()]
>  while the current transaction is empty, and then returns an empty batch of 
> records from the next (or current) invocation of {{{}SourceTask::poll{}}}, 
> the task will fail.
> This is because the Connect framework will honor the transaction abort 
> request by invoking 
> [KafkaProducer::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction()],
>  but without having first invoked 
> [KafkaProducer::beginTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#beginTransaction()]
>  (since no records had been received from the task), which leads to an 
> {{{}IllegalStateException{}}}.
> An example stack trace for this scenario:
> {quote}[2023-03-09 10:41:25,053] ERROR [exactlyOnceQuestionMark|task-0] 
> ExactlyOnceWorkerSourceTask\{id=exactlyOnceQuestionMark-0} Task threw an 
> uncaught and unrecoverable exception. Task is being killed and will not 
> recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:210)
> java.lang.IllegalStateException: TransactionalId 
> exactly-once-source-integration-test-exactlyOnceQuestionMark-0: Invalid 
> transition attempted from state READY to state ABORTING_TRANSACTION
>     at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
>     at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:967)
>     at 
> org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginAbort$3(TransactionManager.java:269)
>     at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116)
>     at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
>     at 
> org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.abortTransaction(ExactlyOnceWorkerSourceTask.java:495)
>     at 
> org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:473)
>     at 
> org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.maybeCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:398)
>     at 
> org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.batchDispatched(ExactlyOnceWorkerSourceTask.java:186)
>     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:362)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
>     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
>     at 
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> {quote}
>  
> As far as a fix goes, we have a few options:
>  # Gracefully handle this case by translating the call to 
> {{TransactionContext::abortTransaction}} into a no-op
>  # Throw an exception (probably an {{{}IllegalStateException{}}}) from 
> {{{}TransactionContext::abortTransaction{}}}, which may fail the task, but 
> would give it the option to swallow the exception and continue processing if 
> it would like
>  # Forcibly fail the task without giving it the chance to swallow an 
> exception, using a similar strategy to how we fail tasks tha

[jira] [Created] (KAFKA-14799) Source tasks fail if connector attempts to abort empty transaction

2023-03-09 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14799:
-

 Summary: Source tasks fail if connector attempts to abort empty 
transaction
 Key: KAFKA-14799
 URL: https://issues.apache.org/jira/browse/KAFKA-14799
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Chris Egerton
Assignee: Chris Egerton


If a source task invokes 
[TransactionContext::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/TransactionContext.html#abortTransaction()]
 while the current transaction is empty, and then returns an empty batch of 
records from the next (or current) invocation of {{{}SourceTask::poll{}}}, the 
task will fail.

This is because the Connect framework will honor the transaction abort request 
by invoking 
[KafkaProducer::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction()],
 but without having first invoked 
[KafkaProducer::beginTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#beginTransaction()]
 (since no records had been received from the task), which leads to an 
{{{}IllegalStateException{}}}.

An example stack trace for this scenario:
{quote}[2023-03-09 10:41:25,053] ERROR [exactlyOnceQuestionMark|task-0] 
ExactlyOnceWorkerSourceTask\{id=exactlyOnceQuestionMark-0} Task threw an 
uncaught and unrecoverable exception. Task is being killed and will not recover 
until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:210)
java.lang.IllegalStateException: TransactionalId 
exactly-once-source-integration-test-exactlyOnceQuestionMark-0: Invalid 
transition attempted from state READY to state ABORTING_TRANSACTION
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:967)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginAbort$3(TransactionManager.java:269)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
    at 
org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
    at 
org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.abortTransaction(ExactlyOnceWorkerSourceTask.java:495)
    at 
org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:473)
    at 
org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.maybeCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:398)
    at 
org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.batchDispatched(ExactlyOnceWorkerSourceTask.java:186)
    at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:362)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
    at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
{quote}
 

As far as a fix goes, we have a few options:
 # Gracefully handle this case by translating the call to 
{{TransactionContext::abortTransaction}} into a no-op
 # Throw an exception (probably an {{{}IllegalStateException{}}}) from 
{{{}TransactionContext::abortTransaction{}}}, which may fail the task, but 
would give it the option to swallow the exception and continue processing if it 
would like
 # Forcibly fail the task without giving it the chance to swallow an exception, 
using a similar strategy to how we fail tasks that request that a transaction 
be committed and aborted for the same record (see 
[here|https://github.com/apache/kafka/blob/c5240c0390892fe9ecbe5285185c370e7be8b2aa/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java#L78-L86])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] andymg3 opened a new pull request, #13370: MINOR: Add unclean field of PartitionReassignmentRevert to hashCode Equals and toString

2023-03-09 Thread via GitHub


andymg3 opened a new pull request, #13370:
URL: https://github.com/apache/kafka/pull/13370

   ### Details
   It looks like we forgot to add the `unclean` field of 
`PartitionReassignmentRevert` to a few methods. So adding it here. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-09 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1131139080


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) {
 return version >= 4;
 }
 
+public short version() {
+return version;
+}
+
 public static class Builder {
 OffsetCommitResponseData data = new OffsetCommitResponseData();
 HashMap byTopicName = new 
HashMap<>();
+private final TopicResolver topicResolver;
+private final short version;
+
+public Builder(TopicResolver topicResolver, short version) {

Review Comment:
   Sounds good. Thanks for your guidance. As you mentioned, this PR is already 
quite large, so if you agree, I will go ahead and implement this change first, 
in a PR of its own. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-03-09 Thread via GitHub


mimaison commented on PR #12992:
URL: https://github.com/apache/kafka/pull/12992#issuecomment-1462175567

   > > Thanks for the PR! I left a suggestion
   > 
   > Yes, concerning the separate class I was thinking the same to be honest, 
also for the static strings for the properties/docs definitions. I don't see a 
reference implementation for other providers as they are not configurable but 
happy to hear any suggestions.
   
   Unfortunately we don't have a mechanism for documenting this type of 
plugins. I think it's fine to keep the doc as an unused constant or simply as a 
comment for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-09 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1131114261


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) {
 return version >= 4;
 }
 
+public short version() {
+return version;
+}
+
 public static class Builder {
 OffsetCommitResponseData data = new OffsetCommitResponseData();
 HashMap byTopicName = new 
HashMap<>();
+private final TopicResolver topicResolver;
+private final short version;
+
+public Builder(TopicResolver topicResolver, short version) {

Review Comment:
   How about doing the following?
   
   We change the signature of `GroupCoordinator.handleCommitOffsets` to the 
following:
   
   ```
 def handleCommitOffsets(groupId: String,
 memberId: String,
 groupInstanceId: Option[String],
 generationId: Int,
 offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
 responseCallback: immutable.Map[TopicIdPartition, 
Errors] => Unit,
 requestLocal: RequestLocal = 
RequestLocal.NoCaching): Unit = {
   ```
   
   Note the change from `TopicPartition` to `TopicIdPartition` for 
`offsetMetadata` and `responseCallback`.
   
   Then, we have to adapt the implementation of `handleCommitOffsets` to get 
the `TopicPartition` from the `TopicIdPartition` where required. We can keep 
`pendingOffsetCommits` and `offsets` keyed by `TopicPartition` for now in 
`GroupMetadataManager`.
   
   This allows the preservation of the topic ids provided to the 
GroupCoordinator but it does not provide any stronger guarantee for the offsets 
yet (as you pointed out). With this approach, we don't depend on the resolver 
at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-09 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1131047710


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) {
 return version >= 4;
 }
 
+public short version() {
+return version;
+}
+
 public static class Builder {
 OffsetCommitResponseData data = new OffsetCommitResponseData();
 HashMap byTopicName = new 
HashMap<>();
+private final TopicResolver topicResolver;
+private final short version;
+
+public Builder(TopicResolver topicResolver, short version) {

Review Comment:
   Thanks for the answer. If I understand correctly, we would then have a 
resolution of topic ids from topic-name-based persisted data, so this may not 
prevent offsets from a topic to be provided as those of another topic with the 
same name (defined at different point in time in the server)?
   
   The resolution can be done in the group coordinator layer, assuming it has 
access to the topic id resolved upstream by the request handler. Because we 
want to preserve the same mapping used when request processing started, we need 
to ensure the right ids are used within the adaptor's 
`GroupCoordinator#commitOffsets` method(). Since the mapping returned from the 
metadata cache depends on the snapshot used at the time the mapping is 
requested, if the adaptor retrieves it from the metadata cache internally, at a 
different time from the request handler, there is no guarantee the metadata is 
the same hence that the topic IDs registered with the broker are the same.
   
   This means that the topic ids need to be propagated from the request handler 
(`KafkaApis`) to the coordinator adaptor somehow. Without a change in the 
method and contract implemented by the coordinator, these ids could be 
transferred via the `OffsetCommitRequestData` DTO directly, which means a 
change in the API schema would be required prior to the change. Alternatively, 
we may want to change the interface of the coordinator and change the signature 
of the offset commit method to allow for the propagation of topic ids.
   
   I may be missing the entire thing though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-09 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1131047710


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) {
 return version >= 4;
 }
 
+public short version() {
+return version;
+}
+
 public static class Builder {
 OffsetCommitResponseData data = new OffsetCommitResponseData();
 HashMap byTopicName = new 
HashMap<>();
+private final TopicResolver topicResolver;
+private final short version;
+
+public Builder(TopicResolver topicResolver, short version) {

Review Comment:
   Thanks for the answer. If I understand correctly, we would then have a 
resolution of topic ids from topic-name-based persisted data, so this may not 
prevent offsets from a topic to be provided as those of another topic with the 
same name (defined at different point in time in the server)?
   
   The resolution can be done in the group coordinator layer, assuming it has 
access to the topic id resolved upstream by the request handler. Because we 
want to preserve the same mapping used when request processing started, we need 
to ensure the right ids are used within the adaptor's 
`GroupCoordinator#commitOffsets` method(). Since the mapping returned from the 
metadata cache depends on the snapshot used at the time the mapping is 
requested, if the adaptor retrieves it from the metadata cache internally, 
there is no guarantee the metadata is the same hence that the topic IDs 
registered with the broker are the same.
   
   This means that the topic ids need to be propagated from the request handler 
(`KafkaApis`) to the coordinator adaptor somehow. Without a change in the 
method and contract implemented by the coordinator, these ids could be 
transferred via the `OffsetCommitRequestData` DTO directly, which means a 
change in the API schema would be required prior to the change. Alternatively, 
we may want to change the interface of the coordinator and change the signature 
of the offset commit method to allow for the propagation of topic ids.
   
   I may be missing the entire thing though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-09 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1131047710


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) {
 return version >= 4;
 }
 
+public short version() {
+return version;
+}
+
 public static class Builder {
 OffsetCommitResponseData data = new OffsetCommitResponseData();
 HashMap byTopicName = new 
HashMap<>();
+private final TopicResolver topicResolver;
+private final short version;
+
+public Builder(TopicResolver topicResolver, short version) {

Review Comment:
   Thanks for the answer. If I understand correctly, we would then have a 
resolution of topic ids from topic-name-based data, so this may not prevent 
offsets from a topic to be provided as those of another topic with the same 
name (defined at different point in time in the server)?
   
   The resolution can be done in the group coordinator layer, assuming it has 
access to the topic id resolved upstream by the request handler. Because we 
want to preserve the same mapping used when request processing started, we need 
to ensure the right ids are used within the adaptor's 
`GroupCoordinator#commitOffsets` method(). Since the mapping returned from the 
metadata cache depends on the snapshot used at the time the mapping is 
requested, if the adaptor retrieves it from the metadata cache internally, 
there is no guarantee the metadata is the same hence that the topic IDs 
registered with the broker are the same.
   
   This means that the topic ids need to be propagated from the request handler 
(`KafkaApis`) to the coordinator adaptor somehow. Without a change in the 
method and contract implemented by the coordinator, these ids could be 
transferred via the `OffsetCommitRequestData` DTO directly, which means a 
change in the API schema would be required prior to the change. Alternatively, 
we may want to change the interface of the coordinator and change the signature 
of the offset commit method to allow for the propagation of topic ids.
   
   I may be missing the entire thing though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException

2023-03-09 Thread via GitHub


urbandan commented on code in PR #11565:
URL: https://github.com/apache/kafka/pull/11565#discussion_r1130991899


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBackingStore.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public abstract class KafkaTopicBackingStore {
+private static final Logger log = 
LoggerFactory.getLogger(KafkaTopicBackingStore.class);
+
+Consumer topicInitializer(String topic, NewTopic 
topicDescription, WorkerConfig config, Time time) {
+return admin -> {
+log.debug("Creating admin client to manage Connect internal {} 
topic", getTopicPurpose());

Review Comment:
   nit: I know this was copied over from the old method, but this doesn't seem 
accurate, can you please fix it?



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBackingStore.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public abstract class KafkaTopicBackingStore {

Review Comment:
   nit: KafkaTopicStore or KafkaTopicBackedStore would be more descriptive



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -328,6 +331,41 @@ public Set createTopics(NewTopic... topics) {
 return createOrFindTopics(topics).createdTopics();
 }
 
+/**
+ * Implements a retry logic around creating topic(s) in case it'd fail due 
to InvalidReplicationFactorException
+ *
+ * @param topicDescription
+ * @param timeoutMs
+ * @param backOffMs
+ * @param time
+ * @return the same as {@link TopicAdmin#createTopics(NewTopic...)}
+ */
+public Set createTopicsWithRetry(NewTopic topicDescription, long 
timeoutMs, long backOffMs, Time time) {
+Timer timer = time.timer(timeoutMs);
+do {
+try {
+return createTopics(topicDescription);
+} catch (ConnectException e) {
+if (timer.notExpired() && retryableTopicCreationException(e)) {
+   

[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-09 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1130942769


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) {
 return version >= 4;
 }
 
+public short version() {
+return version;
+}
+
 public static class Builder {
 OffsetCommitResponseData data = new OffsetCommitResponseData();
 HashMap byTopicName = new 
HashMap<>();
+private final TopicResolver topicResolver;
+private final short version;
+
+public Builder(TopicResolver topicResolver, short version) {

Review Comment:
   The OffsetCommitValue part is not possible at the moment because we don’t 
have a way to downgrade. My colleague @jeffkbkim works on a proposal for this. 
We could start by either migrating from using TopicPartition to using 
TopicIdPartition or handling this in the GroupCoordinatorAdaptor layer. The 
former is likely simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Schm1tz1 commented on pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-03-09 Thread via GitHub


Schm1tz1 commented on PR #12992:
URL: https://github.com/apache/kafka/pull/12992#issuecomment-1461890141

   > Thanks for the PR! I left a suggestion
   
   Yes, I was thinking thew same to be honest, also for the static strings for 
the properties/docs definitions. I don't see a reference implementation for 
other providers as they are not configurable but happy to hear any suggestions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2023-03-09 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698327#comment-17698327
 ] 

Sagar Rao commented on KAFKA-13295:
---

Had a discussion with [~guozhang] and the suggestion is to wait for 4.0 release 
when EOS-v1 would be deprecated. It would simpler to implement the changes in 
EOS-v2.

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
> Fix For: 4.0.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2023-03-09 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-13295:
--
Fix Version/s: 4.0.0
   (was: 3.5.0)

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
> Fix For: 4.0.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14798) corrupted consumer offsets

2023-03-09 Thread Aravind (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aravind updated KAFKA-14798:

Description: 
Kafka version 3.2.3

The consumer offsets for some partitions in a kafka topic are not refreshing 
after a restart of consumer application which made negative lag accumulating 
and the messages in the partitions are not consumed by the application until 
the log-end offset equals to older consumer offset. 
{noformat}
2023-02-07 14:33:43.485 ERROR 15 --- [ntainer#0-0-C-1] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-dummy-1, 
groupId=dummy] User provided listener 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener
 failed on invocation of onPartitionsAssigned for partitions [dummy-0, dummy-1, 
dummy-2] 
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dummy-0 could be determined   2023-02-07 
14:33:43.487 ERROR 15 --- [ntainer#0-0-C-1] 
essageListenerContainer$ListenerConsumer : Consumer 
exceptionjava.lang.IllegalStateException: This error handler cannot process 
'org.apache.kafka.common.errors.TimeoutException's; no record information is 
available     
    at 
org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) 
~[spring-kafka-2.7.9.jar!/:2.7.9]     
    at 
org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     
    at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1604)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     
    at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1212)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
Source) ~[na:na]     
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[na:na]   
  at java.base/java.lang.Thread.run(Unknown Source) ~[na:na] Caused by: 
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dummy-0 could be determined
   {noformat}
Getting "no record information available" consumer exception as the 
FetchPosition is pointing to old committed offset.

  was:
Kafka version 3.2.3

The consumer offsets for some partitions in a kafka topic are not refreshing 
after a restart of consumer application which made negative lag accumulating 
and the messages in the partitions are not consumed by the application until 
the log-end offset equals to older consumer offset. 
{noformat}
2023-02-07 14:33:43.485 ERROR 15 --- [ntainer#0-0-C-1] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-dummy-1, 
groupId=dummy] User provided listener 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener
 failed on invocation of onPartitionsAssigned for partitions [dummy-0, dummy-1, 
dummy-2] 
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dummy-0 could be determined
 
2023-02-07 14:33:43.487 ERROR 15 --- [ntainer#0-0-C-1] 
essageListenerContainer$ListenerConsumer : Consumer 
exceptionjava.lang.IllegalStateException: This error handler cannot process 
'org.apache.kafka.common.errors.TimeoutException's; no record information is 
available     at 
org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) 
~[spring-kafka-2.7.9.jar!/:2.7.9]     at 
org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1604)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1212)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[na:na]     at java.base/java.util.concurrent.FutureTask.run(Unknown Source) 
~[na:na]     at java.base/java.lang.Thread.run(Unknown Source) ~[na:na] Caused 
by: org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dummy-0 could be determined
 {noformat}
Getting "no record information available" consumer exception as the 
FetchPosition is pointing to old committed offset.


> corrupted consumer offsets
> --
>
> Key: KAFKA-14798
> URL: https://issues.apache.org/jira/browse/KAFKA-14798
> Project: Kafka
>  Issue Typ

[jira] [Updated] (KAFKA-14798) corrupted consumer offsets

2023-03-09 Thread Aravind (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aravind updated KAFKA-14798:

Description: 
Kafka version 3.2.3

The consumer offsets for some partitions in a kafka topic are not refreshing 
after a restart of consumer application which made negative lag accumulating 
and the messages in the partitions are not consumed by the application until 
the log-end offset equals to older consumer offset. 
{noformat}
2023-02-07 14:33:43.485 ERROR 15 --- [ntainer#0-0-C-1] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-dummy-1, 
groupId=dummy] User provided listener 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener
 failed on invocation of onPartitionsAssigned for partitions [dummy-0, dummy-1, 
dummy-2] 

org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dummy-0 could be determined   

2023-02-07 14:33:43.487 ERROR 15 --- [ntainer#0-0-C-1] 
essageListenerContainer$ListenerConsumer : Consumer 
exceptionjava.lang.IllegalStateException: This error handler cannot process 
'org.apache.kafka.common.errors.TimeoutException's; no record information is 
available     
    at 
org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) 
~[spring-kafka-2.7.9.jar!/:2.7.9]     
    at 
org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     
    at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1604)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     
    at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1212)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
Source) ~[na:na]     
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[na:na]   
  at java.base/java.lang.Thread.run(Unknown Source) ~[na:na] Caused by: 
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dummy-0 could be determined
   {noformat}
Getting "no record information available" consumer exception as the 
FetchPosition is pointing to old committed offset.

  was:
Kafka version 3.2.3

The consumer offsets for some partitions in a kafka topic are not refreshing 
after a restart of consumer application which made negative lag accumulating 
and the messages in the partitions are not consumed by the application until 
the log-end offset equals to older consumer offset. 
{noformat}
2023-02-07 14:33:43.485 ERROR 15 --- [ntainer#0-0-C-1] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-dummy-1, 
groupId=dummy] User provided listener 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener
 failed on invocation of onPartitionsAssigned for partitions [dummy-0, dummy-1, 
dummy-2] 
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dummy-0 could be determined   2023-02-07 
14:33:43.487 ERROR 15 --- [ntainer#0-0-C-1] 
essageListenerContainer$ListenerConsumer : Consumer 
exceptionjava.lang.IllegalStateException: This error handler cannot process 
'org.apache.kafka.common.errors.TimeoutException's; no record information is 
available     
    at 
org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) 
~[spring-kafka-2.7.9.jar!/:2.7.9]     
    at 
org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     
    at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1604)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     
    at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1212)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
Source) ~[na:na]     
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[na:na]   
  at java.base/java.lang.Thread.run(Unknown Source) ~[na:na] Caused by: 
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dummy-0 could be determined
   {noformat}
Getting "no record information available" consumer exception as the 
FetchPosition is pointing to old committed offset.


> corrupted consumer offsets
> --
>
> Key: KAFKA-14798
> URL: https://issues.apache.org/jira/browse/KAFKA-14798
> P

[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-09 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1130866578


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) {
 return version >= 4;
 }
 
+public short version() {
+return version;
+}
+
 public static class Builder {
 OffsetCommitResponseData data = new OffsetCommitResponseData();
 HashMap byTopicName = new 
HashMap<>();
+private final TopicResolver topicResolver;
+private final short version;
+
+public Builder(TopicResolver topicResolver, short version) {

Review Comment:
   Hi David, thanks for the insight. I think you are right that implementing 
support of topic ids in the functional layer before exposing it in the API 
makes sense as it provides the guarantee that offsets and metadata belong to 
the partitions of the right topic in case of homonyms.
   
   Now, one question is how deep we go in the integration of ids in this layer. 
Would you consider changing the data model authored by the group coordinator 
down to the `OffsetCommitValue ` as prescribed by KIP 848?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14798) corrupted consumer offsets

2023-03-09 Thread Aravind (Jira)
Aravind created KAFKA-14798:
---

 Summary: corrupted consumer offsets
 Key: KAFKA-14798
 URL: https://issues.apache.org/jira/browse/KAFKA-14798
 Project: Kafka
  Issue Type: Bug
Reporter: Aravind


Kafka version 3.2.3

The consumer offsets for some partitions in a kafka topic are not refreshing 
after a restart of consumer application which made negative lag accumulating 
and the messages in the partitions are not consumed by the application until 
the log-end offset equals to older consumer offset. 
{noformat}
2023-02-07 14:33:43.485 ERROR 15 --- [ntainer#0-0-C-1] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-dummy-1, 
groupId=dummy] User provided listener 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener
 failed on invocation of onPartitionsAssigned for partitions [dummy-0, dummy-1, 
dummy-2] 
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dummy-0 could be determined
 
2023-02-07 14:33:43.487 ERROR 15 --- [ntainer#0-0-C-1] 
essageListenerContainer$ListenerConsumer : Consumer 
exceptionjava.lang.IllegalStateException: This error handler cannot process 
'org.apache.kafka.common.errors.TimeoutException's; no record information is 
available     at 
org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) 
~[spring-kafka-2.7.9.jar!/:2.7.9]     at 
org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1604)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1212)
 ~[spring-kafka-2.7.9.jar!/:2.7.9]     at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[na:na]     at java.base/java.util.concurrent.FutureTask.run(Unknown Source) 
~[na:na]     at java.base/java.lang.Thread.run(Unknown Source) ~[na:na] Caused 
by: org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dummy-0 could be determined
 {noformat}
Getting "no record information available" consumer exception as the 
FetchPosition is pointing to old committed offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-03-09 Thread via GitHub


mimaison commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1130786389


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderConfig.java:
##
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.AbstractConfig;

Review Comment:
   I wonder if we really need a separate class for this single config. Could we 
move it to `EnvVarConfigProvider`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions

2023-03-09 Thread via GitHub


dajac merged PR #12813:
URL: https://github.com/apache/kafka/pull/12813


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-09 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1130671265


##
clients/src/main/java/org/apache/kafka/common/TopicIdAndNameBiMapping.java:
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Encapsulates the mapping between topic names and ids assuming a 1:1 
relationship between
+ * a name and an id.
+ * 
+ * Note that this class intends to be used for the (reverse) lookup of topic 
IDs/names, but
+ * not to characterize the set of topics which are known by a client. Use the
+ * {@link org.apache.kafka.clients.MetadataCache} for that purpose.
+ */
+public class TopicIdAndNameBiMapping {
+private final Map topicIds;
+private final Map topicNames;
+
+/**
+ * A mapping which universe of topic ids and names is captured from the 
input map. The reverse association
+ * between a topic ID and a topic name is computed by this method. If 
there are more than one topic name
+ * resolving to the same topic ID, an {@link InvalidTopicException} is 
thrown.
+ */
+public static TopicIdAndNameBiMapping fromTopicIds(Map 
topicIds) {

Review Comment:
   Both names are fine for me. I leave it up to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-09 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1130670740


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) {
 return version >= 4;
 }
 
+public short version() {
+return version;
+}
+
 public static class Builder {
 OffsetCommitResponseData data = new OffsetCommitResponseData();
 HashMap byTopicName = new 
HashMap<>();
+private final TopicResolver topicResolver;
+private final short version;
+
+public Builder(TopicResolver topicResolver, short version) {

Review Comment:
   I actually wonder if we should do it the other way around. We could do 
KAFKA-14793 first, merge it, and update this one accordingly. Without 
KAFKA-14793, the contract of the not really respected and it feels a bit weird 
to work around it here instead of fixing the real issue. Is KAFKA-14793 
complicated? What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org