Re: [PR] KAFKA-16066: Upgrade apacheds to 2.0.0.AM27(change apache kerby) [kafka]

2024-01-26 Thread via GitHub


highluck commented on PR #15277:
URL: https://github.com/apache/kafka/pull/15277#issuecomment-1913048227

   @divijvaidya 
   Please review the code :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16066: Upgrade apacheds to 2.0.0.AM27(change apache kerby) [kafka]

2024-01-26 Thread via GitHub


highluck opened a new pull request, #15277:
URL: https://github.com/apache/kafka/pull/15277

   - apacheds change apache kerby
   - I deleted apacheds because I didn’t need it.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Deprecate long based range queries in SessionStore [kafka]

2024-01-26 Thread via GitHub


armdave opened a new pull request, #15276:
URL: https://github.com/apache/kafka/pull/15276

   [KAFKA-13130](https://issues.apache.org/jira/browse/KAFKA-13130)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]

2024-01-26 Thread via GitHub


DL1231 commented on PR #15211:
URL: https://github.com/apache/kafka/pull/15211#issuecomment-1912974892

   @dajac 
   
   Addressed your review comments. PTAL when you get chance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16066) Upgrade apacheds to 2.0.0.AM27

2024-01-26 Thread highluck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811467#comment-17811467
 ] 

highluck commented on KAFKA-16066:
--

thanks [~divijvaidya] :)

> Upgrade apacheds to 2.0.0.AM27
> --
>
> Key: KAFKA-16066
> URL: https://issues.apache.org/jira/browse/KAFKA-16066
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Assignee: highluck
>Priority: Major
>  Labels: newbie, newbie++
>
> We are currently using a very old dependency. Notably, apacheds is only used 
> for testing when we use MiniKdc, hence, there is nothing stopping us from 
> upgrading it.
> Notably, apacheds has removed the component 
> org.apache.directory.server:apacheds-protocol-kerberos in favour of using 
> Apache Kerby, hence, we need to make changes in MiniKdc.scala for this 
> upgrade to work correctly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16066) Upgrade apacheds to 2.0.0.AM27

2024-01-26 Thread highluck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

highluck reassigned KAFKA-16066:


Assignee: highluck

> Upgrade apacheds to 2.0.0.AM27
> --
>
> Key: KAFKA-16066
> URL: https://issues.apache.org/jira/browse/KAFKA-16066
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Assignee: highluck
>Priority: Major
>  Labels: newbie, newbie++
>
> We are currently using a very old dependency. Notably, apacheds is only used 
> for testing when we use MiniKdc, hence, there is nothing stopping us from 
> upgrading it.
> Notably, apacheds has removed the component 
> org.apache.directory.server:apacheds-protocol-kerberos in favour of using 
> Apache Kerby, hence, we need to make changes in MiniKdc.scala for this 
> upgrade to work correctly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]

2024-01-26 Thread via GitHub


DL1231 commented on code in PR #15211:
URL: https://github.com/apache/kafka/pull/15211#discussion_r1468309807


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -739,6 +747,12 @@ public Optional 
offsetExpirationCondition() {
 return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata 
-> offsetAndMetadata.commitTimestampMs));
 }
 
+@Override
+public boolean isInStatesCaseInsensitive(List statesFilter, long 
committedOffset) {
+Set caseInsensitiveFilterSet = 
statesFilter.stream().map(String::toLowerCase).map(String::trim).collect(Collectors.toSet());

Review Comment:
   Done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -57,10 +57,15 @@ public String toString() {
  */
 String stateAsString(long committedOffset);
 
+/**
+ * @return The {{@link GroupType}}'s LowerCase String representation based 
on the committed offset.
+ */
+String stateAsLowerCaseString(long committedOffset);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]

2024-01-26 Thread via GitHub


DL1231 commented on code in PR #15211:
URL: https://github.com/apache/kafka/pull/15211#discussion_r1468309502


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroupState.java:
##
@@ -114,15 +115,20 @@ public enum ClassicGroupState {
 DEAD.addValidPreviousStates(STABLE, PREPARING_REBALANCE, 
COMPLETING_REBALANCE, EMPTY, DEAD);
 }
 
-ClassicGroupState(String name) {
+ClassicGroupState(String name, String lowerCaseName) {
 this.name = name;
+this.lowerCaseName = lowerCaseName;
 }
 
 @Override
 public String toString() {
 return name;
 }
 
+public String toLowerCaseString() {

Review Comment:
   Thanks for the suggestion, I added the  `toCapitalizedString` to 
`ConsumerGroup.ConsumerGroupState`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]

2024-01-26 Thread via GitHub


DL1231 commented on code in PR #15211:
URL: https://github.com/apache/kafka/pull/15211#discussion_r1468299153


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroupState.java:
##
@@ -114,15 +115,20 @@ public enum ClassicGroupState {
 DEAD.addValidPreviousStates(STABLE, PREPARING_REBALANCE, 
COMPLETING_REBALANCE, EMPTY, DEAD);
 }
 
-ClassicGroupState(String name) {
+ClassicGroupState(String name, String lowerCaseName) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Change `CreateTopicPolicy.RequestMetadata` to always provide partitions, replication factor and replica assignments. [kafka]

2024-01-26 Thread via GitHub


github-actions[bot] commented on PR #14655:
URL: https://github.com/apache/kafka/pull/14655#issuecomment-1912947638

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16156:
--
Component/s: consumer

> System test failing for new consumer on endOffsets with negative timestamps
> ---
>
> Key: KAFKA-16156
> URL: https://issues.apache.org/jira/browse/KAFKA-16156
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
>
> TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid 
> negative timestamp".
> Trace:
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Received ListOffsetResponse 
> ListOffsetsResponseData(throttleTimeMs=0, 
> topics=[ListOffsetsTopicResponse(name='input-topic', 
> partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, 
> oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from 
> broker worker2:9092 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,932] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Handling ListOffsetResponse 
> response for input-topic-0. Fetched offset 42804, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Updating last stable offset for 
> partition input-topic-0 to 42804 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,933] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Fetch offsets completed 
> successfully for partitions and timestamps {input-topic-0=-1}. Result 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862
>  (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,933] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] No events to process 
> (org.apache.kafka.clients.consumer.internals.events.EventProcessor)
> [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event 
> loop (org.apache.kafka.tools.TransactionalMessageCopier)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
>   at 
> org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
>   at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)
> Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>   at 
> 

[jira] [Updated] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16156:
--
Labels: consumer-threading-refactor  (was: )

> System test failing for new consumer on endOffsets with negative timestamps
> ---
>
> Key: KAFKA-16156
> URL: https://issues.apache.org/jira/browse/KAFKA-16156
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
>
> TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid 
> negative timestamp".
> Trace:
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Received ListOffsetResponse 
> ListOffsetsResponseData(throttleTimeMs=0, 
> topics=[ListOffsetsTopicResponse(name='input-topic', 
> partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, 
> oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from 
> broker worker2:9092 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,932] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Handling ListOffsetResponse 
> response for input-topic-0. Fetched offset 42804, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Updating last stable offset for 
> partition input-topic-0 to 42804 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,933] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Fetch offsets completed 
> successfully for partitions and timestamps {input-topic-0=-1}. Result 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862
>  (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,933] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] No events to process 
> (org.apache.kafka.clients.consumer.internals.events.EventProcessor)
> [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event 
> loop (org.apache.kafka.tools.TransactionalMessageCopier)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
>   at 
> org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
>   at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)
> Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>   at 
> 

[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16113:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> AsyncKafkaConsumer: Add missing offset commit metrics
> -
>
> Key: KAFKA-16113
> URL: https://issues.apache.org/jira/browse/KAFKA-16113
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The following metrics are missing from the AsyncKafkaConsumer:
> commit-latency-avg
> commit-latency-max
> commit-rate
> commit-total
> committed-time-ns-total



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16199) Prune the event queue if events have expired before starting

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16199:
--
Parent: (was: KAFKA-15974)
Issue Type: Improvement  (was: Sub-task)

> Prune the event queue if events have expired before starting
> 
>
> Key: KAFKA-16199
> URL: https://issues.apache.org/jira/browse/KAFKA-16199
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-16190) Member should send full heartbeat when rejoining

2024-01-26 Thread Kirk True (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-16190 ]


Kirk True deleted comment on KAFKA-16190:
---

was (Author: kirktrue):
[~lianetm] is this something we want in 3.8?

> Member should send full heartbeat when rejoining
> 
>
> Key: KAFKA-16190
> URL: https://issues.apache.org/jira/browse/KAFKA-16190
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Quoc Phong Dang
>Priority: Major
>  Labels: client-transitions-issues, kip-848-client-support, newbie
> Fix For: 3.8.0
>
>
> The heartbeat request builder should make sure that all fields are sent in 
> the heartbeat request when the consumer rejoins (currently the 
> HeartbeatRequestManager request builder is only reset on failure scenarios). 
> This should fix the issue that a client that is subscribed to a topic and 
> gets fenced, should try to rejoin providing the same subscription it had. 
> Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses 
> this case given that it does explicitly change the subscription when it gets 
> fenced. We should ensure we test a consumer that keeps it same initial 
> subscription when it rejoins after being fenced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16200) Ensure RequestManager handling of expired timeouts are consistent

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16200:
--
Parent: (was: KAFKA-15974)
Issue Type: Improvement  (was: Sub-task)

> Ensure RequestManager handling of expired timeouts are consistent
> -
>
> Key: KAFKA-16200
> URL: https://issues.apache.org/jira/browse/KAFKA-16200
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16199) Prune the event queue if events have expired before starting

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16199:
--
Priority: Critical  (was: Major)

> Prune the event queue if events have expired before starting
> 
>
> Key: KAFKA-16199
> URL: https://issues.apache.org/jira/browse/KAFKA-16199
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16185:
--
Fix Version/s: 3.8.0

> Fix client reconciliation of same assignment received in different epochs 
> --
>
> Key: KAFKA-16185
> URL: https://issues.apache.org/jira/browse/KAFKA-16185
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> Currently, the intention in the client state machine is that the client 
> always reconciles whatever it has pending that has not been removed by the 
> coordinator.
> There is still an edge case where this does not happen, and the client might 
> get stuck JOINING/RECONCILING, with a pending reconciliation (delayed), and 
> it receives the same assignment, but in a new epoch (ex. after being FENCED). 
> First time it receives the assignment it takes no action, as it already has 
> it as pending to reconcile, but when the reconciliation completes it discards 
> the result because the epoch changed. And this is wrong. Note that after 
> sending the assignment with the new epoch one time, the broker continues to 
> send null assignments. 
> Here is a sample sequence leading to the client stuck JOINING:
> - client joins, epoch 0
> - client receives assignment tp1, stuck RECONCILING, epoch 1
> - member gets FENCED on the coord, coord bumps epoch to 2
> - client tries to rejoin (JOINING), epoch 0 provided by the client
> - new member added to the group (group epoch bumped to 3), client receives 
> same assignment that is currently trying to reconcile (tp1), but with epoch 3
> - previous reconciliation completes, but will discard the result because it 
> will notice that the memberHasRejoined (memberEpochOnReconciliationStart != 
> memberEpoch). Client is stuck JOINING, with the server sending null target 
> assignment because it hasn't changed since the last one sent (tp1)
> (We should end up with a test similar to the existing 
> #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case 
> that the member receives the same assignment after being fenced and rejoining)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16190) Member should send full heartbeat when rejoining

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16190:
--
Fix Version/s: 3.8.0

> Member should send full heartbeat when rejoining
> 
>
> Key: KAFKA-16190
> URL: https://issues.apache.org/jira/browse/KAFKA-16190
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Quoc Phong Dang
>Priority: Major
>  Labels: client-transitions-issues, kip-848-client-support, newbie
> Fix For: 3.8.0
>
>
> The heartbeat request builder should make sure that all fields are sent in 
> the heartbeat request when the consumer rejoins (currently the 
> HeartbeatRequestManager request builder is only reset on failure scenarios). 
> This should fix the issue that a client that is subscribed to a topic and 
> gets fenced, should try to rejoin providing the same subscription it had. 
> Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses 
> this case given that it does explicitly change the subscription when it gets 
> fenced. We should ensure we test a consumer that keeps it same initial 
> subscription when it rejoins after being fenced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16169) FencedException in commitAsync not propagated without callback

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16169:
--
Fix Version/s: 3.8.0

> FencedException in commitAsync not propagated without callback
> --
>
> Key: KAFKA-16169
> URL: https://issues.apache.org/jira/browse/KAFKA-16169
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Lianet Magrans
>Priority: Major
> Fix For: 3.8.0
>
>
> The javadocs for {{commitAsync()}} (w/o callback) say:
> @throws org.apache.kafka.common.errors.FencedInstanceIdException if this 
> consumer instance gets fenced by broker.
>  
> If no callback is passed into {{{}commitAsync(){}}}, no offset commit 
> callback invocation is submitted. However, we only check for a 
> {{FencedInstanceIdException}} when we execute a callback. It seems to me that 
> with {{commitAsync()}} we would not throw at all when the consumer gets 
> fenced.
> In any case, we need a unit test that verifies that the 
> {{FencedInstanceIdException}} is thrown for each version of 
> {{{}commitAsync(){}}}.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16178) AsyncKafkaConsumer doesn't retry joining the group after rediscovering group coordinator

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16178:
--
Fix Version/s: 3.8.0

> AsyncKafkaConsumer doesn't retry joining the group after rediscovering group 
> coordinator
> 
>
> Key: KAFKA-16178
> URL: https://issues.apache.org/jira/browse/KAFKA-16178
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Dongnuo Lyu
>Assignee: Philip Nee
>Priority: Critical
>  Labels: client-transitions-issues, consumer-threading-refactor
> Fix For: 3.8.0
>
> Attachments: pkc-devc63jwnj_jan19_0_debug
>
>
> {code:java}
> [2024-01-17 21:34:59,500] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] Discovered group coordinator 
> Coordinator(key='consumer-groups-test-0', nodeId=3, 
> host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, 
> errorCode=0, errorMessage='') 
> (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162)
> [2024-01-17 21:34:59,681] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] GroupHeartbeatRequest failed because the 
> group coordinator 
> Optional[b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: 
> 2147483644 rack: null)] is incorrect. Will attempt to find the coordinator 
> again and retry in 0ms: This is not the correct coordinator. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:407)
> [2024-01-17 21:34:59,681] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] Group coordinator 
> b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483644 rack: 
> null) is unavailable or invalid due to cause: This is not the correct 
> coordinator.. Rediscovery will be attempted. 
> (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:136)
> [2024-01-17 21:34:59,882] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] Discovered group coordinator 
> Coordinator(key='consumer-groups-test-0', nodeId=3, 
> host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, 
> errorCode=0, errorMessage='') 
> (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162){code}
> Some of the consumers don't consume any message. The logs show that after the 
> consumer starts up and successfully logs in,
>  # The consumer discovers the group coordinator.
>  # The heartbeat to join group fails because "This is not the correct 
> coordinator"
>  # The consumer rediscover the group coordinator.
> Another heartbeat should follow the rediscovery of the group coordinator, but 
> there's no logs showing sign of a heartbeat request. 
> On the server side, there is completely no log about the group id. A 
> suspicion is that the consumer doesn't send a heartbeat request after 
> rediscover the group coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16190) Member should send full heartbeat when rejoining

2024-01-26 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811431#comment-17811431
 ] 

Kirk True commented on KAFKA-16190:
---

[~lianetm] is this something we want in 3.8?

> Member should send full heartbeat when rejoining
> 
>
> Key: KAFKA-16190
> URL: https://issues.apache.org/jira/browse/KAFKA-16190
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Quoc Phong Dang
>Priority: Major
>  Labels: client-transitions-issues, kip-848-client-support, newbie
>
> The heartbeat request builder should make sure that all fields are sent in 
> the heartbeat request when the consumer rejoins (currently the 
> HeartbeatRequestManager request builder is only reset on failure scenarios). 
> This should fix the issue that a client that is subscribed to a topic and 
> gets fenced, should try to rejoin providing the same subscription it had. 
> Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses 
> this case given that it does explicitly change the subscription when it gets 
> fenced. We should ensure we test a consumer that keeps it same initial 
> subscription when it rejoins after being fenced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
--
Parent: (was: KAFKA-15974)
Issue Type: Bug  (was: Sub-task)

> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --
>
> Key: KAFKA-15848
> URL: https://issues.apache.org/jira/browse/KAFKA-15848
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
> {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their 
> use and interpretation of the {{Timer}} that is supplied.
> h3. tl;dr
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
> success of its operations _before_ checking the timer:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{NetworkClient.poll()}}
>  # Check for result
>  ## If successful, return success
>  ## If fatal failure, return failure
>  # Check timer
>  ## If timer expired, return failure
> {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{Future.get()}}
>  ## If operation timed out, {{Future.get()}} will throw a timeout error
>  # Check for result
>  ## If successful, return success
>  ## Otherwise, return failure
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via any 
> of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} 
> API. Here's a bit of code that illustrates the difference between the two 
> approaches.
> {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
> manner similar to this:
> {code:java}
> public int getCount(Timer timer) {
> do {
> final RequestFuture future = sendSomeRequest(partitions);
> client.poll(future, timer);
> if (future.isDone())
> return future.get();
> } while (timer.notExpired());
> return -1;
> }
> {code}
> {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:java}
> private int getCount(Timer timer) {
> try {
> CompletableFuture future = new CompleteableFuture<>();
> applicationEventQueue.add(future);
> return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
> } catch (TimeoutException e) {
> return -1;
> }
> }
> {code}
> The call to {{add}} enqueues the network operation, but it then _immediately_ 
> invokes {{Future.get()}} with the timeout to implement a time-bounded 
> blocking call. Since this method is being called with a timeout of 0, it 
> _immediately_ throws a {{{}TimeoutException{}}}. 
> h3. Suggested fix
> TBD :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15475:
--
Parent: (was: KAFKA-15974)
Issue Type: Bug  (was: Sub-task)

> Timeout request might retry forever even if the user API times out in 
> PrototypeAsyncConsumer
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15305:
--
Parent: (was: KAFKA-15974)
Issue Type: Bug  (was: Sub-task)

> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16115:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> AsyncKafkaConsumer: Add missing heartbeat metrics
> -
>
> Key: KAFKA-16115
> URL: https://issues.apache.org/jira/browse/KAFKA-16115
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The following metrics are missing:
> |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]|
> |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]|
> |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]|
> |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]|
> |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16190) Member should send full heartbeat when rejoining

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16190:
--
Labels: client-transitions-issues kip-848-client-support newbie  (was: 
client-transitions-issues newbie)

> Member should send full heartbeat when rejoining
> 
>
> Key: KAFKA-16190
> URL: https://issues.apache.org/jira/browse/KAFKA-16190
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Quoc Phong Dang
>Priority: Major
>  Labels: client-transitions-issues, kip-848-client-support, newbie
>
> The heartbeat request builder should make sure that all fields are sent in 
> the heartbeat request when the consumer rejoins (currently the 
> HeartbeatRequestManager request builder is only reset on failure scenarios). 
> This should fix the issue that a client that is subscribed to a topic and 
> gets fenced, should try to rejoin providing the same subscription it had. 
> Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses 
> this case given that it does explicitly change the subscription when it gets 
> fenced. We should ensure we test a consumer that keeps it same initial 
> subscription when it rejoins after being fenced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16185:
--
Labels: client-transitions-issues kip-848-client-support  (was: 
client-transitions-issues)

> Fix client reconciliation of same assignment received in different epochs 
> --
>
> Key: KAFKA-16185
> URL: https://issues.apache.org/jira/browse/KAFKA-16185
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues, kip-848-client-support
>
> Currently, the intention in the client state machine is that the client 
> always reconciles whatever it has pending that has not been removed by the 
> coordinator.
> There is still an edge case where this does not happen, and the client might 
> get stuck JOINING/RECONCILING, with a pending reconciliation (delayed), and 
> it receives the same assignment, but in a new epoch (ex. after being FENCED). 
> First time it receives the assignment it takes no action, as it already has 
> it as pending to reconcile, but when the reconciliation completes it discards 
> the result because the epoch changed. And this is wrong. Note that after 
> sending the assignment with the new epoch one time, the broker continues to 
> send null assignments. 
> Here is a sample sequence leading to the client stuck JOINING:
> - client joins, epoch 0
> - client receives assignment tp1, stuck RECONCILING, epoch 1
> - member gets FENCED on the coord, coord bumps epoch to 2
> - client tries to rejoin (JOINING), epoch 0 provided by the client
> - new member added to the group (group epoch bumped to 3), client receives 
> same assignment that is currently trying to reconcile (tp1), but with epoch 3
> - previous reconciliation completes, but will discard the result because it 
> will notice that the memberHasRejoined (memberEpochOnReconciliationStart != 
> memberEpoch). Client is stuck JOINING, with the server sending null target 
> assignment because it hasn't changed since the last one sent (tp1)
> (We should end up with a test similar to the existing 
> #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case 
> that the member receives the same assignment after being fenced and rejoining)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16100) Add timeout to all the CompletableApplicationEvents

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Parent: (was: KAFKA-15974)
Issue Type: Improvement  (was: Sub-task)

> Add timeout to all the CompletableApplicationEvents
> ---
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>   Original Estimate: 40h
>  Remaining Estimate: 40h
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]

2024-01-26 Thread via GitHub


mjsax commented on PR #15151:
URL: https://github.com/apache/kafka/pull/15151#issuecomment-1912805679

   Merged to `trunk` and cherry-picked to `3.7` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]

2024-01-26 Thread via GitHub


mjsax merged PR #15151:
URL: https://github.com/apache/kafka/pull/15151


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]

2024-01-26 Thread via GitHub


mjsax commented on PR #15151:
URL: https://github.com/apache/kafka/pull/15151#issuecomment-1912803502

   On `StreamsStandbyTask ` failed, which was fixed recently via 
https://github.com/apache/kafka/pull/15217. Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14404: fix overlap of streams-config sections & describe additional parameters [kafka]

2024-01-26 Thread via GitHub


mjsax commented on PR #15162:
URL: https://github.com/apache/kafka/pull/15162#issuecomment-1912801281

   Thanks for pushing this over the finish line @AyoubOm! Great PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Minor update to KafkaApisTest [kafka]

2024-01-26 Thread via GitHub


jolshan merged PR #15257:
URL: https://github.com/apache/kafka/pull/15257


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15675: Improve worker liveness check during Connect integration tests [kafka]

2024-01-26 Thread via GitHub


C0urante commented on code in PR #15249:
URL: https://github.com/apache/kafka/pull/15249#discussion_r1468141005


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -238,9 +238,6 @@ public void testBrokerCoordinator() throws Exception {
 
 connect.kafka().stopOnlyKafka();
 
-connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
-"Group of workers did not remain the same after broker 
shutdown");
-

Review Comment:
   This check doesn't seem necessary. A distributed Connect worker does become 
unavailable when the underlying Kafka cluster is unavailable, and the existing 
check (that a request to `GET /` is successful) doesn't really prove much about 
worker liveness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15675: Improve worker liveness check during Connect integration tests [kafka]

2024-01-26 Thread via GitHub


C0urante commented on code in PR #15249:
URL: https://github.com/apache/kafka/pull/15249#discussion_r1468141005


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -238,9 +238,6 @@ public void testBrokerCoordinator() throws Exception {
 
 connect.kafka().stopOnlyKafka();
 
-connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
-"Group of workers did not remain the same after broker 
shutdown");
-

Review Comment:
   This check doesn't seem necessary, and fails now if left in. A distributed 
Connect worker does become unavailable when the underlying Kafka cluster is 
unavailable, and the existing check (that a request to `GET /` is successful) 
doesn't really prove much about worker liveness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16100) Add timeout to all the CompletableApplicationEvents

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Remaining Estimate: 40h  (was: 120h)
 Original Estimate: 40h  (was: 120h)

> Add timeout to all the CompletableApplicationEvents
> ---
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>   Original Estimate: 40h
>  Remaining Estimate: 40h
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16100) Add timeout to all the CompletableApplicationEvents

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Remaining Estimate: 120h
 Original Estimate: 120h

> Add timeout to all the CompletableApplicationEvents
> ---
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
--
Parent: KAFKA-15974
Issue Type: Sub-task  (was: Bug)

> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --
>
> Key: KAFKA-15848
> URL: https://issues.apache.org/jira/browse/KAFKA-15848
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
> {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their 
> use and interpretation of the {{Timer}} that is supplied.
> h3. tl;dr
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
> success of its operations _before_ checking the timer:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{NetworkClient.poll()}}
>  # Check for result
>  ## If successful, return success
>  ## If fatal failure, return failure
>  # Check timer
>  ## If timer expired, return failure
> {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{Future.get()}}
>  ## If operation timed out, {{Future.get()}} will throw a timeout error
>  # Check for result
>  ## If successful, return success
>  ## Otherwise, return failure
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via any 
> of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} 
> API. Here's a bit of code that illustrates the difference between the two 
> approaches.
> {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
> manner similar to this:
> {code:java}
> public int getCount(Timer timer) {
> do {
> final RequestFuture future = sendSomeRequest(partitions);
> client.poll(future, timer);
> if (future.isDone())
> return future.get();
> } while (timer.notExpired());
> return -1;
> }
> {code}
> {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:java}
> private int getCount(Timer timer) {
> try {
> CompletableFuture future = new CompleteableFuture<>();
> applicationEventQueue.add(future);
> return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
> } catch (TimeoutException e) {
> return -1;
> }
> }
> {code}
> The call to {{add}} enqueues the network operation, but it then _immediately_ 
> invokes {{Future.get()}} with the timeout to implement a time-bounded 
> blocking call. Since this method is being called with a timeout of 0, it 
> _immediately_ throws a {{{}TimeoutException{}}}. 
> h3. Suggested fix
> TBD :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15305:
--
Parent: KAFKA-15974
Issue Type: Sub-task  (was: Improvement)

> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15475:
--
Parent: KAFKA-15974
Issue Type: Sub-task  (was: Bug)

> Timeout request might retry forever even if the user API times out in 
> PrototypeAsyncConsumer
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15974) Enforce that events and requests respect user-provided timeouts

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15974:
--
Description: The intention of the {{CompletableApplicationEvent}} is for a 
{{Consumer}} to block waiting for the event to complete. The application thread 
will block for the timeout, but there is not yet a consistent manner in which 
events are timed out.  (was: The intention of the 
{{CompletableApplicationEvent}} is for a {{Consumer}} to block waiting for the 
event to complete. The application thread will block for the timeout, but there 
is not yet a consistent manner in which events are timed out.

Steps:
 # Add the timeout to all the {{{}CompletableApplicationEvent{}}}s
 # Prune the event queue if events have expired before starting
 # Canceled by the background thread if the event expired after starting)

> Enforce that events and requests respect user-provided timeouts
> ---
>
> Key: KAFKA-15974
> URL: https://issues.apache.org/jira/browse/KAFKA-15974
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to 
> block waiting for the event to complete. The application thread will block 
> for the timeout, but there is not yet a consistent manner in which events are 
> timed out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16200) Ensure RequestManager handling of expired timeouts are consistent

2024-01-26 Thread Kirk True (Jira)
Kirk True created KAFKA-16200:
-

 Summary: Ensure RequestManager handling of expired timeouts are 
consistent
 Key: KAFKA-16200
 URL: https://issues.apache.org/jira/browse/KAFKA-16200
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16199) Prune the event queue if events have expired before starting

2024-01-26 Thread Kirk True (Jira)
Kirk True created KAFKA-16199:
-

 Summary: Prune the event queue if events have expired before 
starting
 Key: KAFKA-16199
 URL: https://issues.apache.org/jira/browse/KAFKA-16199
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16100) Add timeout to all the CompletableApplicationEvents

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Summary: Add timeout to all the CompletableApplicationEvents  (was: 
Consistent handling of timeouts and responses for new consumer 
ApplicationEvents)

> Add timeout to all the CompletableApplicationEvents
> ---
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15974) Enforce that events and requests respect user-provided timeouts

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15974:
--
Summary: Enforce that events and requests respect user-provided timeouts  
(was: Enforce that CompletableApplicationEvent has a timeout that is respected)

> Enforce that events and requests respect user-provided timeouts
> ---
>
> Key: KAFKA-15974
> URL: https://issues.apache.org/jira/browse/KAFKA-15974
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to 
> block waiting for the event to complete. The application thread will block 
> for the timeout, but there is not yet a consistent manner in which events are 
> timed out.
> Steps:
>  # Add the timeout to all the {{{}CompletableApplicationEvent{}}}s
>  # Prune the event queue if events have expired before starting
>  # Canceled by the background thread if the event expired after starting



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Parent: KAFKA-15974
Issue Type: Sub-task  (was: Improvement)

> Consistent handling of timeouts and responses for new consumer 
> ApplicationEvents
> 
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15974) Enforce that CompletableApplicationEvent has a timeout that is respected

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15974:
--
Description: 
The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to 
block waiting for the event to complete. The application thread will block for 
the timeout, but there is not yet a consistent manner in which events are timed 
out.

Steps:
 # Add the timeout to all the {{{}CompletableApplicationEvent{}}}s
 # Prune the event queue if events have expired before starting
 # Canceled by the background thread if the event expired after starting

  was:
The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to 
block waiting for the event to complete. The application thread will block for 
the timeout, but there is not yet a consistent manner in which events are timed 
out.

Steps:

1. Add the timeout to all the {{CompletableApplicationEvent}}s
2. Prune the event queue if events have expired before starting
3. Canceled by the background thread if the event expired after starting


> Enforce that CompletableApplicationEvent has a timeout that is respected
> 
>
> Key: KAFKA-15974
> URL: https://issues.apache.org/jira/browse/KAFKA-15974
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to 
> block waiting for the event to complete. The application thread will block 
> for the timeout, but there is not yet a consistent manner in which events are 
> timed out.
> Steps:
>  # Add the timeout to all the {{{}CompletableApplicationEvent{}}}s
>  # Prune the event queue if events have expired before starting
>  # Canceled by the background thread if the event expired after starting



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14683 Migrate #testStartPaused to Mockito [kafka]

2024-01-26 Thread via GitHub


hgeraldino commented on code in PR #14663:
URL: https://github.com/apache/kafka/pull/14663#discussion_r1468126514


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.ClusterConfigState;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
+import org.mockito.stubbing.Answer;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class WorkerSinkTaskMockitoTest {
+// These are fixed to keep this code simpler. In this example we assume 
byte[] raw values
+// with mix of integer/string in Connect
+private static final String TOPIC = "test";
+private static final int PARTITION = 12;
+private static final int PARTITION2 = 13;
+private static final int PARTITION3 = 14;
+private static final long FIRST_OFFSET = 45;
+private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
+private static final int 

[jira] [Updated] (KAFKA-15974) Enforce that CompletableApplicationEvent has a timeout that is respected

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15974:
--
Priority: Blocker  (was: Major)

> Enforce that CompletableApplicationEvent has a timeout that is respected
> 
>
> Key: KAFKA-15974
> URL: https://issues.apache.org/jira/browse/KAFKA-15974
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to 
> block waiting for the event to complete. The application thread will block 
> for the timeout, but there is not yet a consistent manner in which events are 
> timed out.
> Steps:
> 1. Add the timeout to all the {{CompletableApplicationEvent}}s
> 2. Prune the event queue if events have expired before starting
> 3. Canceled by the background thread if the event expired after starting



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Component/s: consumer

> Consistent handling of timeouts and responses for new consumer 
> ApplicationEvents
> 
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Priority: Blocker  (was: Major)

> Consistent handling of timeouts and responses for new consumer 
> ApplicationEvents
> 
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents

2024-01-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Parent: (was: KAFKA-14048)
Issue Type: Improvement  (was: Sub-task)

> Consistent handling of timeouts and responses for new consumer 
> ApplicationEvents
> 
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16122) TransactionsBounceTest -- server disconnected before response was received

2024-01-26 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-16122.

Resolution: Fixed

> TransactionsBounceTest -- server disconnected before response was received
> --
>
> Key: KAFKA-16122
> URL: https://issues.apache.org/jira/browse/KAFKA-16122
> Project: Kafka
>  Issue Type: Test
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> I noticed a ton of tests failing with 
> h4.  
> {code:java}
> Error  org.apache.kafka.common.KafkaException: Unexpected error in 
> TxnOffsetCommitResponse: The server disconnected before a response was 
> received.  {code}
> {code:java}
> Stacktrace  org.apache.kafka.common.KafkaException: Unexpected error in 
> TxnOffsetCommitResponse: The server disconnected before a response was 
> received.  at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1702)
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1236)
>   at 
> app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
>   at app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) 
>  at 
> app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:457)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:334)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:249)  
> at java.base@21.0.1/java.lang.Thread.run(Thread.java:1583) {code}
> The error indicates a network error which is retriable but the 
> TxnOffsetCommit handler doesn't expect this. 
> https://issues.apache.org/jira/browse/KAFKA-14417 addressed many of the other 
> requests but not this one. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16122: Handle retriable errors in TxnOffsetCommits [kafka]

2024-01-26 Thread via GitHub


jolshan merged PR #15266:
URL: https://github.com/apache/kafka/pull/15266


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16189; Extend admin to support ConsumerGroupDescribe API [kafka]

2024-01-26 Thread via GitHub


dajac commented on PR #15253:
URL: https://github.com/apache/kafka/pull/15253#issuecomment-1912511055

   cc @nizhikov @jolshan FYI - I have a few minor changes related to the 
consumer group command here. As saw that you’re working on migrating it to Java.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15987) Refactor ReplicaManager code for transaction verification

2024-01-26 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-15987.

Resolution: Fixed

> Refactor ReplicaManager code for transaction verification
> -
>
> Key: KAFKA-15987
> URL: https://issues.apache.org/jira/browse/KAFKA-15987
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> I started to do this in KAFKA-15784, but the diff was deemed too large and 
> confusing. I just wanted to file a followup ticket to reference this in code 
> for the areas that will be refactored.
>  
> I hope to tackle it immediately after.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-26 Thread via GitHub


jolshan merged PR #15248:
URL: https://github.com/apache/kafka/pull/15248


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-26 Thread via GitHub


dajac commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1467998324


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1099,29 +1055,48 @@ private static boolean isGroupIdNotEmpty(String 
groupId) {
 }
 
 /**
- * Handles the exception in the scheduleWriteOperation.
- * @return The Errors instance associated with the given exception.
+ * This is the handler commonly used by all the operations that requires 
to convert errors to
+ * coordinator errors. The handler also handles and log unexpected errors.
+ *
+ * @param requestName   The name of the request.
+ * @param request   The request itself for logging purposes.
+ * @param exception The exception to handle.
+ * @param responseBuilder   A function which takes an Errors and a String 
and returns
+ *  the response. The String can be null.
+ * @return The response.
+ * @param  The type of the request.

Review Comment:
   Yeah, I agree. The types do not actually matter because the request is only 
used for logging and the response is only passed from the handler to the 
caller. It perhaps mean that the structure of the helper is not right. Let me 
see if I can find an alternative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-26 Thread via GitHub


dajac commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1467996373


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -360,14 +334,13 @@ public CompletableFuture joinGroup(
 Duration.ofMillis(config.offsetCommitTimeoutMs),
 coordinator -> coordinator.classicGroupJoin(context, request, 
responseFuture)
 ).exceptionally(exception -> {
-if (!(exception instanceof KafkaException)) {

Review Comment:
   Sorry, I did not get your point initially. The errors for those APIs are 
also converted but they are within ´classicGroupJoin’ based on the same scheme. 
So doing it here does not hurt. As a follow up, I’d like to remove the 
translation there in order to have it centralized here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-26 Thread via GitHub


jolshan commented on PR #15248:
URL: https://github.com/apache/kafka/pull/15248#issuecomment-1912499332

   Sorry I meant to check this one yesterday but accidentally looked at your 
other PR. I thought I was waiting on build issues there, but should have been 
looking here. Looking now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-26 Thread via GitHub


lianetm commented on PR #15271:
URL: https://github.com/apache/kafka/pull/15271#issuecomment-1912485870

   Hey @lucasbru, thanks for the changes, not only fixing the bug but also 
moving in the direction we want of internally spreading the use of topicIds. 
Left a few comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-26 Thread via GitHub


lianetm commented on PR #15271:
URL: https://github.com/apache/kafka/pull/15271#issuecomment-1912485446

   Just to clarify, regarding the last note on the PR description, "if a new 
assignment or new metadata arrives during an ongoing reconciliation, it will 
never be applied". Maybe I'm missing something you're seeing, but the current 
logic does apply new assignments received or discovered while there is an 
ongoing reconciliation. The main issue to improve with 
[KAFKA-15832](https://issues.apache.org/jira/browse/KAFKA-15832) is not having 
to rely on 2 triggers, or receiving (same) assignment from the broker (be able 
then to better handle edge cases around receiving same assignment in different 
epoch not triggering reconciliation). Maybe I'm missing something, but if not 
then I would suggest we just remove that note?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15832: Trigger client reconciliation based on manager poll [kafka]

2024-01-26 Thread via GitHub


lucasbru opened a new pull request, #15275:
URL: https://github.com/apache/kafka/pull/15275

   Currently, the reconciliation logic on the client is triggered when a new 
target assignment is received and resolved, or when new unresolved target 
assignments are discovered in metadata.
   
   This change improves this by triggering the reconciliation logic on each 
poll iteration, to reconcile whatever is ready to be reconciled. This would 
require changes to support poll on the MembershipManager, and integrate it with 
the current polling logic in the background thread. Receiving a new target 
assignment from the broker, or resolving new topic names via a metadata update 
could only ensure that the #assignmentReadyToReconcile is properly updated 
(currently done), but wouldn't trigger the #reconcile() logic, leaving that to 
the #poll() operation.
   
   Stacked PR, do not review the first commit.
   
   Draft - still working out some details and some tests missing.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-26 Thread via GitHub


jolshan merged PR #15087:
URL: https://github.com/apache/kafka/pull/15087


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-26 Thread via GitHub


lianetm commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1467835947


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1340,18 +1336,15 @@ boolean reconciliationInProgress() {
  * When cluster metadata is updated, try to resolve topic names for topic 
IDs received in
  * assignment that hasn't been resolved yet.
  * 
- * Try to find topic names for all unresolved assignments
+ * Try to find topic names for all assignments
  * Add discovered topic names to the local topic names cache
  * If any topics are resolved, trigger a reconciliation 
process
  * If some topics still remain unresolved, request another 
metadata update
  * 
  */
 @Override
 public void onUpdate(ClusterResource clusterResource) {
-resolveMetadataForUnresolvedAssignment();
-if (!assignmentReadyToReconcile.isEmpty()) {
-reconcile();
-}
+reconcile();

Review Comment:
   We should probably reconcile here only if we're in the RECONCILING state I 
would say. We could receive metadata updates anytime. With the current 
reconcile implementation I expect nothing bad would happen by calling this 
reconcile on a metadata update when there is nothing to reconcile or are in 
some other state/transition, but probably clearer/safer to reason about it with 
a short-circuit here to avoid unexpected effects/logs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-26 Thread via GitHub


lianetm commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1467947528


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1311,21 +1294,34 @@ public Map> 
currentAssignment() {
 return this.currentAssignment;
 }
 
-
 /**
  * @return Set of topic IDs received in a target assignment that have not 
been reconciled yet
- * because topic names are not in metadata. Visible for testing.
+ * because topic names are not in metadata or reconciliation hasn't 
finished. Visible for testing.
  */
-Set topicsWaitingForMetadata() {
-return Collections.unmodifiableSet(assignmentUnresolved.keySet());
+Set topicsAwaitingReconciliation() {

Review Comment:
   We are slightly changing the concept of what this does, and I'm fine with it 
because it's ok for how it's used in the tests, but I think the 
description/name/implementation are not well aligned? This is not strictly 
getting topicsAwaitingReconciliation ("not in metadata or reconciliation hasn't 
finished"), ex. member owns one partitions of the topic, and another one is 
being added and stuck reconciling (committing offsets, revoking, etc), that 
topic wouldn't be returned here but it's indeed a 
`topicsAwaitingReconciliation` (for which "reconciliation hasn't finished"). 
What about we just rely on the`topicPartitionsAwaitingReconciliation` you 
defined below (that seems accurate to me), and here we just `return 
topicPartitionsAwaitingReconciliation().keySet();`
   I do know this is not causing trouble because of how it is used in the tests 
at the moment, but better to get it right to avoid confusion/misuse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-26 Thread via GitHub


lianetm commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1467947528


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1311,21 +1294,34 @@ public Map> 
currentAssignment() {
 return this.currentAssignment;
 }
 
-
 /**
  * @return Set of topic IDs received in a target assignment that have not 
been reconciled yet
- * because topic names are not in metadata. Visible for testing.
+ * because topic names are not in metadata or reconciliation hasn't 
finished. Visible for testing.
  */
-Set topicsWaitingForMetadata() {
-return Collections.unmodifiableSet(assignmentUnresolved.keySet());
+Set topicsAwaitingReconciliation() {

Review Comment:
   We are slightly changing the concept of what this does, and I'm fine with it 
because it's ok for how it's used in the tests, but I think the 
description/name/implementation are not well aligned? This is not strictly 
getting topicsAwaitingReconciliation ("not in metadata or reconciliation hasn't 
finished"), ex. member owns one partitions of the topic, and another one is 
being added and stuck reconciling (committing offsets, revoking, etc), that 
topic wouldn't be returned here but it's indeed a 
`topicsAwaitingReconciliation` (for which "reconciliation hasn't finished"). 
What about we just rely on the`topicPartitionsAwaitingReconciliation` you 
defined below (that seems accurate to me), and here we just `return 
topicPartitionsAwaitingReconciliation().keySet();`
   I do know this is not causing trouble because of how it is used in the tests 
but better to get it right to avoid confusion/misuse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-26 Thread via GitHub


jolshan commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1467940534


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1098,30 +1054,36 @@ private static boolean isGroupIdNotEmpty(String 
groupId) {
 return groupId != null && !groupId.isEmpty();
 }
 
-/**
- * Handles the exception in the scheduleWriteOperation.
- * @return The Errors instance associated with the given exception.
- */
-private static Errors normalizeException(Throwable exception) {
-exception = Errors.maybeUnwrapException(exception);
-
-if (exception instanceof UnknownTopicOrPartitionException ||
-exception instanceof NotEnoughReplicasException ||
-exception instanceof TimeoutException) {
-return Errors.COORDINATOR_NOT_AVAILABLE;
-}
-
-if (exception instanceof NotLeaderOrFollowerException ||
-exception instanceof KafkaStorageException) {
-return Errors.NOT_COORDINATOR;
-}
-
-if (exception instanceof RecordTooLargeException ||
-exception instanceof RecordBatchTooLargeException ||
-exception instanceof InvalidFetchSizeException) {
-return Errors.UNKNOWN_SERVER_ERROR;
+private  RSP handleOperationException(
+String requestName,
+REQ request,
+Throwable exception,
+BiFunction responseBuilder
+) {
+ApiError apiError = ApiError.fromThrowable(exception);
+
+switch (apiError.error()) {
+case UNKNOWN_SERVER_ERROR:
+log.error("{} request {} hit an unexpected exception: {}.",
+requestName, request, exception.getMessage(), exception);
+return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, 
null);
+
+case UNKNOWN_TOPIC_OR_PARTITION:
+case NOT_ENOUGH_REPLICAS:
+case REQUEST_TIMED_OUT:
+return responseBuilder.apply(Errors.COORDINATOR_NOT_AVAILABLE, 
null);
+
+case NOT_LEADER_OR_FOLLOWER:
+case KAFKA_STORAGE_ERROR:
+return responseBuilder.apply(Errors.NOT_COORDINATOR, null);
+
+case MESSAGE_TOO_LARGE:
+case RECORD_LIST_TOO_LARGE:
+case INVALID_FETCH_SIZE:
+return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, 
null);
+
+default:
+return responseBuilder.apply(apiError.error(), 
apiError.message());

Review Comment:
   Cool. The alternative is that we aren't returning the default string for the 
exception. I believe that is the case, but just wanted to confirm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-26 Thread via GitHub


jolshan commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1467939251


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -524,37 +495,39 @@ public CompletableFuture 
listGroups(
 );
 }
 
-final CompletableFuture future = new 
CompletableFuture<>();
-final List results = new 
ArrayList<>();
 final Set existingPartitionSet = runtime.partitions();
-final AtomicInteger cnt = new 
AtomicInteger(existingPartitionSet.size());
 
 if (existingPartitionSet.isEmpty()) {
 return CompletableFuture.completedFuture(new 
ListGroupsResponseData());
 }
 
+final 
List>> futures =
+new ArrayList<>();
+
 for (TopicPartition tp : existingPartitionSet) {
-runtime.scheduleReadOperation(
+futures.add(runtime.scheduleReadOperation(
 "list-groups",
 tp,
 (coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
-).handle((groups, exception) -> {
-if (exception == null) {
-synchronized (results) {
-results.addAll(groups);
-}
+).exceptionally(exception -> {
+exception = Errors.maybeUnwrapException(exception);

Review Comment:
   I see. Makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-26 Thread via GitHub


jolshan commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1467938411


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -360,14 +334,13 @@ public CompletableFuture joinGroup(
 Duration.ofMillis(config.offsetCommitTimeoutMs),
 coordinator -> coordinator.classicGroupJoin(context, request, 
responseFuture)
 ).exceptionally(exception -> {
-if (!(exception instanceof KafkaException)) {

Review Comment:
   Hmm -- in the KafkaException case we are converting errors though -- so the 
error handling for those seems to be changing right? if we return 
unknown_topic_or_partition we will convert the response to coordinator not 
available. 
   
   Are you saying the error handling doesn't change because all the other 
errors are not expected to be returned and if they somehow do and get converted 
then it doesn't matter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-26 Thread via GitHub


jolshan commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1467936095


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1099,29 +1055,48 @@ private static boolean isGroupIdNotEmpty(String 
groupId) {
 }
 
 /**
- * Handles the exception in the scheduleWriteOperation.
- * @return The Errors instance associated with the given exception.
+ * This is the handler commonly used by all the operations that requires 
to convert errors to
+ * coordinator errors. The handler also handles and log unexpected errors.
+ *
+ * @param requestName   The name of the request.
+ * @param request   The request itself for logging purposes.
+ * @param exception The exception to handle.
+ * @param responseBuilder   A function which takes an Errors and a String 
and returns
+ *  the response. The String can be null.
+ * @return The response.
+ * @param  The type of the request.

Review Comment:
   hmmm. I see. It is a little confusing to me. If I was trying to add a new 
request/response, it is not clear exactly what needs to be supplied and what to 
get out. But maybe that's something I would need to look in to more and most 
people would understand.
   
   It seems like req is really just used for logging the incoming info when 
there is an error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-01-26 Thread via GitHub


mimaison opened a new pull request, #15274:
URL: https://github.com/apache/kafka/pull/15274

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]

2024-01-26 Thread via GitHub


fvaleri opened a new pull request, #15273:
URL: https://github.com/apache/kafka/pull/15273

   Refactoring related to #14847.
   
   The following properties are migrated from KafkaConfig (core):
   
   - LogConfig (storage)
 - log.dir
 - logs.dirs
 - metadata.log.dir
 - inter.broker.protocol.version
 - unstable.api.versions.enable
 - unstable.metadata.versions.enable
   
   - RaftConfig (raft)
 - node.id
 - process.roles
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-26 Thread via GitHub


lianetm commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1467868971


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1010,17 +1000,24 @@ private void resolveMetadataForUnresolvedAssignment() {
 Optional nameFromMetadata = 
findTopicNameInGlobalOrLocalCache(topicId);
 nameFromMetadata.ifPresent(resolvedTopicName -> {
 // Name resolved, so assignment is ready for reconciliation.
-addToAssignmentReadyToReconcile(topicId, resolvedTopicName, 
topicPartitions);
+topicPartitions.forEach(tp -> {
+TopicIdPartition topicIdPartition = new TopicIdPartition(
+topicId,
+new TopicPartition(resolvedTopicName, tp));

Review Comment:
   Indentation is off here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-26 Thread via GitHub


lianetm commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1467868408


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -997,11 +985,13 @@ void markReconciliationCompleted() {
  * 
  * 
  */
-private void resolveMetadataForUnresolvedAssignment() {
-assignmentReadyToReconcile.clear();
+private SortedSet resolveMetadataForTargetAssignment() {
+final SortedSet assignmentReadyToReconcile = new 
TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR);
+final HashMap> unresolved = new 
HashMap<>(currentTargetAssignment);
+
 // Try to resolve topic names from metadata cache or subscription 
cache, and move
 // assignments from the "unresolved" collection, to the 
"readyToReconcile" one.

Review Comment:
   Let's update this comment now that we don't have the "unresolved" and 
"readyToReconcile" collections exactly in that shape anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up core server classes [kafka]

2024-01-26 Thread via GitHub


jlprat commented on PR #15272:
URL: https://github.com/apache/kafka/pull/15272#issuecomment-1912344480

   @showuon if you have some time, this is a follow up of the one you reviewed 
already.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up core server classes [kafka]

2024-01-26 Thread via GitHub


jlprat commented on code in PR #15272:
URL: https://github.com/apache/kafka/pull/15272#discussion_r1467861593


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -80,7 +80,7 @@ object BrokerMetadataPublisher extends Logging {
   Option(newTopicsImage.getPartition(topicId, partitionId)) match {
 case Some(partition) =>
   if (!partition.replicas.contains(brokerId)) {
-info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas} " +
+info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas.mkString("Array(", ", ", ")")} " +

Review Comment:
   This actually should have printed the memory reference and not the contents. 
With the current version, it prints the contents.



##
core/src/main/scala/kafka/server/ConfigAdminManager.scala:
##
@@ -154,26 +154,25 @@ class ConfigAdminManager(nodeId: Int,
   throw new InvalidRequestException(s"Unknown resource type 
${resource.resourceType().toInt}")
   }
 } catch {
-  case t: Throwable => {
+  case t: Throwable =>
 val err = ApiError.fromThrowable(t)
-info(s"Error preprocessing incrementalAlterConfigs request on 
${configResource}", t)
+info(s"Error preprocessing incrementalAlterConfigs request on 
$configResource", t)
 results.put(resource, err)
-  }
 }
   }
 })
 results
   }
 
-  def validateBrokerConfigChange(
+  private def validateBrokerConfigChange(
 resource: IAlterConfigsResource,
 configResource: ConfigResource
   ): Unit = {
 val perBrokerConfig = !configResource.name().isEmpty
 val persistentProps = configRepository.config(configResource)
 val configProps = conf.dynamicConfig.fromPersistentProps(persistentProps, 
perBrokerConfig)
 val alterConfigOps = resource.configs().asScala.map {
-  case config =>
+  config =>

Review Comment:
   Previous code was the standard in really old version of Scala, since then 
anonymous function style is preferred.



##
core/src/main/scala/kafka/server/ConfigHelper.scala:
##
@@ -80,7 +80,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
   def describeConfigs(resourceToConfigNames: List[DescribeConfigsResource],
   includeSynonyms: Boolean,
   includeDocumentation: Boolean): 
List[DescribeConfigsResponseData.DescribeConfigsResult] = {
-resourceToConfigNames.map { case resource =>

Review Comment:
   Previous code was the standard in really old version of Scala, since then 
anonymous function style is preferred.



##
core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala:
##
@@ -98,14 +98,14 @@ class ClientQuotaMetadataManager(private[metadata] val 
quotaManagers: QuotaManag
 }
   }
   quotaDelta.changes().entrySet().forEach { e =>
-handleUserClientQuotaChange(userClientEntity, e.getKey, 
e.getValue.asScala.map(_.toDouble))
+handleUserClientQuotaChange(userClientEntity, e.getKey, 
e.getValue.asScala)

Review Comment:
   The map to Double wasn't needed as it was already a double



##
core/src/main/scala/kafka/server/ConfigAdminManager.scala:
##
@@ -369,15 +365,13 @@ object ConfigAdminManager {
 persistentResponses: AlterConfigsResponseData
   ): AlterConfigsResponseData = {
 val response = new AlterConfigsResponseData()
-val responsesByResource = 
persistentResponses.responses().iterator().asScala.map {
-  case r => (r.resourceName(), r.resourceType()) -> new 
ApiError(r.errorCode(), r.errorMessage())

Review Comment:
   Previous code was the standard in really old version of Scala, since then 
anonymous function style is preferred.



##
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##
@@ -50,6 +50,10 @@ trait AutoTopicCreationManager {
 
 object AutoTopicCreationManager {
 
+  /**
+   * @deprecated use [[apply(kafka.server.KafkaConfig, 
kafka.server.MetadataCache, scala.Option, scala.Option, scala.Option, 
scala.Option, org.apache.kafka.coordinator.group.GroupCoordinator, 
kafka.coordinator.transaction.TransactionCoordinator)]]
+   */
+  @deprecated("Use the alternative apply method", "3.8.0")

Review Comment:
   I could alternatively delete this method altogether.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16190) Member should send full heartbeat when rejoining

2024-01-26 Thread Quoc Phong Dang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Quoc Phong Dang reassigned KAFKA-16190:
---

Assignee: Quoc Phong Dang

> Member should send full heartbeat when rejoining
> 
>
> Key: KAFKA-16190
> URL: https://issues.apache.org/jira/browse/KAFKA-16190
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Quoc Phong Dang
>Priority: Major
>  Labels: client-transitions-issues, newbie
>
> The heartbeat request builder should make sure that all fields are sent in 
> the heartbeat request when the consumer rejoins (currently the 
> HeartbeatRequestManager request builder is only reset on failure scenarios). 
> This should fix the issue that a client that is subscribed to a topic and 
> gets fenced, should try to rejoin providing the same subscription it had. 
> Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses 
> this case given that it does explicitly change the subscription when it gets 
> fenced. We should ensure we test a consumer that keeps it same initial 
> subscription when it rejoins after being fenced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16190) Member should send full heartbeat when rejoining

2024-01-26 Thread Quoc Phong Dang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Quoc Phong Dang reassigned KAFKA-16190:
---

Assignee: Quoc Phong Dang

> Member should send full heartbeat when rejoining
> 
>
> Key: KAFKA-16190
> URL: https://issues.apache.org/jira/browse/KAFKA-16190
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Quoc Phong Dang
>Priority: Major
>  Labels: client-transitions-issues, newbie
>
> The heartbeat request builder should make sure that all fields are sent in 
> the heartbeat request when the consumer rejoins (currently the 
> HeartbeatRequestManager request builder is only reset on failure scenarios). 
> This should fix the issue that a client that is subscribed to a topic and 
> gets fenced, should try to rejoin providing the same subscription it had. 
> Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses 
> this case given that it does explicitly change the subscription when it gets 
> fenced. We should ensure we test a consumer that keeps it same initial 
> subscription when it rejoins after being fenced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16190) Member should send full heartbeat when rejoining

2024-01-26 Thread Quoc Phong Dang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Quoc Phong Dang reassigned KAFKA-16190:
---

Assignee: (was: Quoc Phong Dang)

> Member should send full heartbeat when rejoining
> 
>
> Key: KAFKA-16190
> URL: https://issues.apache.org/jira/browse/KAFKA-16190
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues, newbie
>
> The heartbeat request builder should make sure that all fields are sent in 
> the heartbeat request when the consumer rejoins (currently the 
> HeartbeatRequestManager request builder is only reset on failure scenarios). 
> This should fix the issue that a client that is subscribed to a topic and 
> gets fenced, should try to rejoin providing the same subscription it had. 
> Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses 
> this case given that it does explicitly change the subscription when it gets 
> fenced. We should ensure we test a consumer that keeps it same initial 
> subscription when it rejoins after being fenced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Clean up core server classes [kafka]

2024-01-26 Thread via GitHub


jlprat opened a new pull request, #15272:
URL: https://github.com/apache/kafka/pull/15272

   Follow up from https://github.com/apache/kafka/pull/15252
   
   Mark methods and fields private where possible:
   - Annotate public methods and fields
   - Remove unused classes and methods
   - Make sure Arrays are not printed with .toString
   - Optimize minor warnings
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-26 Thread via GitHub


lianetm commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1467835947


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1340,18 +1336,15 @@ boolean reconciliationInProgress() {
  * When cluster metadata is updated, try to resolve topic names for topic 
IDs received in
  * assignment that hasn't been resolved yet.
  * 
- * Try to find topic names for all unresolved assignments
+ * Try to find topic names for all assignments
  * Add discovered topic names to the local topic names cache
  * If any topics are resolved, trigger a reconciliation 
process
  * If some topics still remain unresolved, request another 
metadata update
  * 
  */
 @Override
 public void onUpdate(ClusterResource clusterResource) {
-resolveMetadataForUnresolvedAssignment();
-if (!assignmentReadyToReconcile.isEmpty()) {
-reconcile();
-}
+reconcile();

Review Comment:
   We should probably reconcile here only if we're in the RECONCILING state I 
would say. We could receive metadata updates anytime. With the current 
reconcile implementation I expect nothing bad would happen by calling this 
reconcile on a metadata update when there is nothing to reconcile, but it's 
still not the right thing to do, so probably better to short-circuit here to 
avoid undesired effects/logs 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-26 Thread via GitHub


lianetm commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1467829868


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -766,23 +755,28 @@ public void transitionToStale() {
 }
 
 /**
- * Reconcile the assignment that has been received from the server and for 
which topic names
- * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
- * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+ * Reconcile the assignment that has been received from the server. If for 
some topics, the
+ * topic ID cannot be matched to a topic name, a metadata update will be 
triggered and only
+ * the subset of topics that are resolvable will be reconciled. 
Reconcilation will trigger the

Review Comment:
   typo in Reconciliation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-26 Thread via GitHub


lianetm commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1467829521


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -674,6 +662,7 @@ void transitionToSendingLeaveGroup() {
 ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
 updateMemberEpoch(leaveEpoch);
 currentAssignment = new HashMap<>();
+targetAssignmentReconciled = 
currentAssignment.equals(currentTargetAssignment);

Review Comment:
   I wonder if this could end up not being accurate in some case. Thinking 
about a consumer RECONCILING, and then leaving the group while on it. Here we 
would end up with `targetAssignmentReconciled` false, which is not accurate, as 
the target is not relevant anymore and will be discarded when that delayed 
reconciliation completes. 
   
   Is there a value in the `targetAssignmentReconciled` var other than caching 
the sets comparison result? It definitely brings the challenge of making sure 
we update it with the right value, in all the right places. Would it be safer 
and simpler maybe just to keep the `targetAssignmentReconciled()` method, 
checking current vs target when needed?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -208,22 +209,18 @@ public class MembershipManagerImpl implements 
MembershipManager, ClusterResource
 private final Map assignedTopicNamesCache;
 
 /**
- * Topic IDs received in a target assignment for which we haven't found 
topic names yet.
- * Items are added to this set every time a target assignment is received. 
Items are removed
- * when metadata is found for the topic. This is where the member collects 
all assignments
- * received from the broker, even though they may not be ready to 
reconcile due to missing
+ * Topic IDs received in the last target assignment. Items are added to 
this set every time a

Review Comment:
   nit: Topic IDs "and partitions"...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-01-26 Thread via GitHub


dongnuo123 commented on code in PR #15268:
URL: https://github.com/apache/kafka/pull/15268#discussion_r1467816570


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+private JoinGroupRequestProtocol throwIfProtocolUnmatched(
+ConsumerGroupMember member,
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols
+) {
+for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : 
protocols) {
+final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
+ConsumerProtocol.deserializeVersion(buffer);
+final Optional generationId = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId();
+
+// If the generation id is provided, it must match the member 
epoch.
+if (!generationId.isPresent() || generationId.get() == 
member.memberEpoch()) {
+// TODO: need a list of all available server assignors
+if 
(UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name())
+|| 
RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) {
+return protocol;
+}
+}
+}
+throw new FencedMemberEpochException("The JoinGroup request doesn't 
have a matched generation id from a " +
+"protocol supported by the server assignors with the epoch of the 
member known by the group coordinator (" +
+member.memberEpoch() + ").");
+}
+
+private List 
transitionToConsumerGroupHeartbeatTopicPartitions(
+List topicPartitions
+) {
+Map> topicMap = new HashMap<>();
+topicPartitions.forEach(tp ->
+topicMap.computeIfAbsent(tp.topic(), __ -> new 
ArrayList<>()).add(tp.partition())
+);
+return topicMap.entrySet().stream().map(item -> {
+TopicImage topicImage = 
metadataImage.topics().getTopic(item.getKey());
+if (topicImage == null) {
+throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic 
id of topic " + item.getKey() + ".");
+}
+return new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+.setTopicId(topicImage.id())
+.setPartitions(item.getValue());
+}).collect(Collectors.toList());
+}
+
+public CoordinatorResult upgradeGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > classicGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+return EMPTY_RESULT;
+}
+
+// Get or create the consumer group.
+final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+throwIfConsumerGroupIsFull(group, memberId);
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+// new dynamic member.
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+member = group.getOrMaybeCreateMember(memberId, true);
+newMemberCreated = !group.members().containsKey(memberId);
+log.info("[GroupId {}] Member {} joins the consumer group.", 

Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-01-26 Thread via GitHub


dongnuo123 commented on code in PR #15268:
URL: https://github.com/apache/kafka/pull/15268#discussion_r1467814642


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+private JoinGroupRequestProtocol throwIfProtocolUnmatched(
+ConsumerGroupMember member,
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols
+) {
+for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : 
protocols) {
+final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
+ConsumerProtocol.deserializeVersion(buffer);
+final Optional generationId = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId();
+
+// If the generation id is provided, it must match the member 
epoch.
+if (!generationId.isPresent() || generationId.get() == 
member.memberEpoch()) {
+// TODO: need a list of all available server assignors
+if 
(UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name())
+|| 
RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) {
+return protocol;
+}
+}
+}
+throw new FencedMemberEpochException("The JoinGroup request doesn't 
have a matched generation id from a " +
+"protocol supported by the server assignors with the epoch of the 
member known by the group coordinator (" +
+member.memberEpoch() + ").");
+}
+
+private List 
transitionToConsumerGroupHeartbeatTopicPartitions(
+List topicPartitions
+) {
+Map> topicMap = new HashMap<>();
+topicPartitions.forEach(tp ->
+topicMap.computeIfAbsent(tp.topic(), __ -> new 
ArrayList<>()).add(tp.partition())
+);
+return topicMap.entrySet().stream().map(item -> {
+TopicImage topicImage = 
metadataImage.topics().getTopic(item.getKey());
+if (topicImage == null) {
+throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic 
id of topic " + item.getKey() + ".");
+}
+return new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+.setTopicId(topicImage.id())
+.setPartitions(item.getValue());
+}).collect(Collectors.toList());
+}
+
+public CoordinatorResult upgradeGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > classicGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+return EMPTY_RESULT;
+}
+
+// Get or create the consumer group.
+final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+throwIfConsumerGroupIsFull(group, memberId);
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+// new dynamic member.
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+member = group.getOrMaybeCreateMember(memberId, true);
+newMemberCreated = !group.members().containsKey(memberId);
+log.info("[GroupId {}] Member {} joins the consumer group.", 

Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-01-26 Thread via GitHub


dongnuo123 commented on code in PR #15268:
URL: https://github.com/apache/kafka/pull/15268#discussion_r1467811816


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+private JoinGroupRequestProtocol throwIfProtocolUnmatched(
+ConsumerGroupMember member,
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols
+) {
+for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : 
protocols) {
+final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
+ConsumerProtocol.deserializeVersion(buffer);
+final Optional generationId = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId();
+
+// If the generation id is provided, it must match the member 
epoch.
+if (!generationId.isPresent() || generationId.get() == 
member.memberEpoch()) {
+// TODO: need a list of all available server assignors
+if 
(UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name())
+|| 
RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) {
+return protocol;
+}
+}
+}
+throw new FencedMemberEpochException("The JoinGroup request doesn't 
have a matched generation id from a " +
+"protocol supported by the server assignors with the epoch of the 
member known by the group coordinator (" +
+member.memberEpoch() + ").");
+}
+
+private List 
transitionToConsumerGroupHeartbeatTopicPartitions(
+List topicPartitions
+) {
+Map> topicMap = new HashMap<>();
+topicPartitions.forEach(tp ->
+topicMap.computeIfAbsent(tp.topic(), __ -> new 
ArrayList<>()).add(tp.partition())
+);
+return topicMap.entrySet().stream().map(item -> {
+TopicImage topicImage = 
metadataImage.topics().getTopic(item.getKey());
+if (topicImage == null) {
+throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic 
id of topic " + item.getKey() + ".");
+}
+return new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+.setTopicId(topicImage.id())
+.setPartitions(item.getValue());
+}).collect(Collectors.toList());
+}
+
+public CoordinatorResult upgradeGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > classicGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+return EMPTY_RESULT;
+}
+
+// Get or create the consumer group.
+final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+throwIfConsumerGroupIsFull(group, memberId);
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+// new dynamic member.
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+member = group.getOrMaybeCreateMember(memberId, true);
+newMemberCreated = !group.members().containsKey(memberId);
+log.info("[GroupId {}] Member {} joins the consumer group.", 

Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-01-26 Thread via GitHub


dongnuo123 commented on code in PR #15268:
URL: https://github.com/apache/kafka/pull/15268#discussion_r1467807980


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+private JoinGroupRequestProtocol throwIfProtocolUnmatched(
+ConsumerGroupMember member,
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols
+) {
+for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : 
protocols) {
+final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
+ConsumerProtocol.deserializeVersion(buffer);
+final Optional generationId = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId();
+
+// If the generation id is provided, it must match the member 
epoch.
+if (!generationId.isPresent() || generationId.get() == 
member.memberEpoch()) {
+// TODO: need a list of all available server assignors
+if 
(UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name())
+|| 
RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) {
+return protocol;
+}
+}
+}
+throw new FencedMemberEpochException("The JoinGroup request doesn't 
have a matched generation id from a " +
+"protocol supported by the server assignors with the epoch of the 
member known by the group coordinator (" +
+member.memberEpoch() + ").");

Review Comment:
   maybe handle the case where generation id doesn't match the member epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: cleanup core modules part 1 [kafka]

2024-01-26 Thread via GitHub


jlprat merged PR #15252:
URL: https://github.com/apache/kafka/pull/15252


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-01-26 Thread via GitHub


dongnuo123 commented on code in PR #15268:
URL: https://github.com/apache/kafka/pull/15268#discussion_r1467804300


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+private JoinGroupRequestProtocol throwIfProtocolUnmatched(
+ConsumerGroupMember member,
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols
+) {
+for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : 
protocols) {
+final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
+ConsumerProtocol.deserializeVersion(buffer);
+final Optional generationId = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId();
+
+// If the generation id is provided, it must match the member 
epoch.
+if (!generationId.isPresent() || generationId.get() == 
member.memberEpoch()) {
+// TODO: need a list of all available server assignors
+if 
(UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name())
+|| 
RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) {
+return protocol;
+}
+}
+}
+throw new FencedMemberEpochException("The JoinGroup request doesn't 
have a matched generation id from a " +
+"protocol supported by the server assignors with the epoch of the 
member known by the group coordinator (" +
+member.memberEpoch() + ").");

Review Comment:
   not supported by the old protocol



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: cleanup core modules part 1 [kafka]

2024-01-26 Thread via GitHub


jlprat commented on PR #15252:
URL: https://github.com/apache/kafka/pull/15252#issuecomment-1912259898

   All tests failures seem to be unrelated to these changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16198) Reconciliation may lose partitions when topic metadata is delayed

2024-01-26 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16198:
---
Labels: client-transitions-issues clients consumer kip-848 
kip-848-client-support  (was: clients consumer kip-848 kip-848-client-support)

> Reconciliation may lose partitions when topic metadata is delayed
> -
>
> Key: KAFKA-16198
> URL: https://issues.apache.org/jira/browse/KAFKA-16198
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: client-transitions-issues, clients, consumer, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> The current reconciliation code in `AsyncKafkaConsumer`s `MembershipManager` 
> may lose part of the server-provided assignment when metadata is delayed. The 
> reason is incorrect handling of partially resolved topic names, as in this 
> example:
>  * We get assigned {{T1-1}} and {{T2-1}}
>  * We reconcile {{{}T1-1{}}}, {{T2-1}} remains in {{assignmentUnresolved}} 
> since the topic id {{T2}} is not known yet
>  * We get new cluster metadata, which includes {{{}T2{}}}, so {{T2-1}} is 
> moved to {{assignmentReadyToReconcile}}
>  * We call {{reconcile}} -- {{T2-1}} is now treated as the full assignment, 
> so {{T1-1}} is being revoked
>  * We end up with assignment {{T2-1, which is inconsistent with the 
> broker-side target assignment.}}
>  
> Generally, this seems to be a problem around semantics of the internal 
> collections `assignmentUnresolved` and `assignmentReadyToReconcile`. Absence 
> of a topic in `assignmentReadyToReconcile` may mean either revocation of the 
> topic partition(s), or unavailability of a topic name for the topic.
> Internal state with simpler and correct invariants could be achieved by using 
> a single collection `currentTargetAssignment` which is based on topic IDs and 
> always corresponds to the latest assignment received from the broker. During 
> every attempted reconciliation, all topic IDs will be resolved from the local 
> cache, which should not introduce a lot of overhead. `assignmentUnresolved` 
> and `assignmentReadyToReconcile` are removed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-26 Thread via GitHub


lucasbru commented on PR #15271:
URL: https://github.com/apache/kafka/pull/15271#issuecomment-1912206859

   @lianetm Could you have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7

2024-01-26 Thread Gaurav Narula (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811308#comment-17811308
 ] 

Gaurav Narula commented on KAFKA-16162:
---

Raised https://github.com/apache/kafka/pull/15270

> New created topics are unavailable after upgrading to 3.7
> -
>
> Key: KAFKA-16162
> URL: https://issues.apache.org/jira/browse/KAFKA-16162
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Blocker
>
> In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration 
> request will include the `LogDirs` fields with UUID for each log dir in each 
> broker. This info will be stored in the controller and used to identify if 
> the log dir is known and online while handling AssignReplicasToDirsRequest 
> [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093].
>  
> While upgrading from old version, the kafka cluster will run in 3.7 binary 
> with old metadata version, and then upgrade to newer version using 
> kafka-features.sh. That means, while brokers startup and send the 
> brokerRegistration request, it'll be using older metadata version without 
> `LogDirs` fields included. And it makes the controller has no log dir info 
> for all brokers. Later, after upgraded, if new topic is created, the flow 
> will go like this:
> 1. Controller assign replicas and adds in metadata log
> 2. brokers fetch the metadata and apply it
> 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment
> 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica 
> assignment, controller will think the log dir in current replica is offline, 
> so triggering offline handler, and reassign leader to another replica, and 
> offline, until no more replicas to assign, so assigning leader to -1 (i.e. no 
> leader) 
> So, the results will be that new created topics are unavailable (with no 
> leader) because the controller thinks all log dir are offline.
> {code:java}
> lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic 
> quickstart-events3 --bootstrap-server localhost:9092  
> 
> Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: 
> 3   ReplicationFactor: 3Configs: segment.bytes=1073741824
>   Topic: quickstart-events3   Partition: 0Leader: none
> Replicas: 7,2,6 Isr: 6
>   Topic: quickstart-events3   Partition: 1Leader: none
> Replicas: 2,6,7 Isr: 6
>   Topic: quickstart-events3   Partition: 2Leader: none
> Replicas: 6,7,2 Isr: 6
> {code}
> The log snippet in the controller :
> {code:java}
> # handling 1st assignReplicaToDirs request
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] 
> offline-dir-assignment: changing partition(s): quickstart-events3-0, 
> quickstart-events3-2, quickstart-events3-1 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
> quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
> [AA, AA, AA] -> 
> [7K5JBERyyqFFxIXSXYluJA, AA, AA], 
> partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
> change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
> isr=null, leader=-2, replicas=null, removingReplicas=null, 
> addingReplicas=null, leaderRecoveryState=-1, 
> directories=[7K5JBERyyqFFxIXSXYluJA, AA, 
> AA], eligibleLeaderReplicas=null, lastKnownELR=null) for 
> topic quickstart-events3 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
> quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
> [AA, AA, AA] -> 
> [AA, 7K5JBERyyqFFxIXSXYluJA, 

[PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-26 Thread via GitHub


lucasbru opened a new pull request, #15271:
URL: https://github.com/apache/kafka/pull/15271

   The current reconciliation code in `AsyncKafkaConsumer`s `MembershipManager` 
may lose part of the server-provided assignment when metadata is delayed. The 
reason is incorrect handling of partially resolved topic names, as in this 
example:
   
* We get assigned `T1-1` and `T2-1`
* We reconcile `T1-1`, `T2-1` remains in `assignmentUnresolved` since the 
topic id T2 is not known yet
* We get new cluster metadata, which includes `T2`, so `T2-1` is moved to 
`assignmentReadyToReconcile`
* We call reconcile -- `T2-1` is now treated as the full assignment, so 
`T1-1` is being revoked
* We end up with assignment `T2-1`, which is inconsistent with the 
broker-side target assignment.
   
   Generally, this seems to be a problem around semantics of the internal 
collections `assignmentUnresolved` and `assignmentReadyToReconcile`. Absence of 
a topic in `assignmentReadyToReconcile` may either mean revocation of the topic 
partition(s), or unavailability of a topic name for the topic, depending on the 
context.
   
   This change reimplements that part of the internal state of 
`MembershipManagerImpl` with simpler and more correct invariants by using a 
single collection `currentTargetAssignment` which is based on topic IDs and 
always corresponds to the latest assignment received from the broker. During 
every attempted reconciliation, all topic IDs will be resolved from the local 
cache, which should not introduce a lot of overhead. `assignmentUnresolved` and 
`assignmentReadyToReconcile` are removed. 
   
   This change is in line with the goal of using topic IDs instead of topic 
names in the consumer internal state, and fixes the bug of losing partitions 
when delayed metadata arrives.
   
   A unit test testing the above situation is added.
   
   Note, that this change does not fully the reconciliation problems, because 
if a new assignment or new metadata arrives during an ongoing reconciliation, 
it will never be applied. This will be solved in a separate change 
(KAFKA-15832).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]

2024-01-26 Thread via GitHub


gaurav-narula commented on PR #15270:
URL: https://github.com/apache/kafka/pull/15270#issuecomment-1912201646

   CC: @showuon @OmniaGM @cmccabe @pprovenzano 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-26 Thread via GitHub


dajac commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1467690595


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -40,6 +42,29 @@ enum GroupType {
 public String toString() {
 return name;
 }
+
+/**
+ * Parse a string into the corresponding {@code GroupType} enum value, 
in a case-insensitive manner.
+ *
+ * @return The {{@link GroupType}} according to the string passed.
+ *
+ * @throws IllegalArgumentException If the input string does not match 
any {@code GroupType}.
+ */
+public static GroupType parse(String typeString) {
+for (GroupType type : GroupType.values()) {

Review Comment:
   Yeah, we don't have to change it here as it is not related. The minimum to 
do here is to also build a Map within the enum to do the lookup. It is better 
to be consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-26 Thread via GitHub


dajac commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1467689455


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -452,21 +453,38 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+Set statesFilter,
+Set typesFilter,
+long committedOffset
+) {
+// Converts each string to a value in the GroupType enum while being 
case-insensitive.
+Set enumTypesFilter = typesFilter.stream()
+.map(Group.GroupType::parse)
+.collect(Collectors.toSet());

Review Comment:
   > In the consumerGroupCommand tool we return an illegalArgumentException if 
the type is unknown so I thought maybe it would make sense to do that here as 
well.
   
   Yeah, the context is a bit different here. You have to think about how 
illegalArgumentException will get returned to the user. The Kafka protocol does 
not have any error for this so it converts it to a Unknown Server Error without 
any details. This is not great from a user perspective.
   
   I think that the correct way would be to try parsing the received group 
type. If it succeeds, great, we keep it. Otherwise, we simply discard it. If 
you want to keep the illegalArgumentException, you could catch it when you call 
`parse`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]

2024-01-26 Thread via GitHub


gaurav-narula commented on PR #15262:
URL: https://github.com/apache/kafka/pull/15262#issuecomment-1912105919

   @showuon added a test with commit 
https://github.com/apache/kafka/pull/15262/commits/d0858deb5500ebdec071bc06ca6f98726ae6f602.
 Please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >