Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-31 Thread via GitHub


lucasbru merged PR #15271:
URL: https://github.com/apache/kafka/pull/15271


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15675: Improve worker liveness check during Connect integration tests [kafka]

2024-01-31 Thread via GitHub


yashmayya commented on code in PR #15249:
URL: https://github.com/apache/kafka/pull/15249#discussion_r1473904200


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java:
##
@@ -963,11 +962,19 @@ public Set activeWorkers() {
 return workers().stream()

Review Comment:
   I'm unable to comment on that line directly but the `ObjectMapper` 
initialized in the line above can be removed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16156:
--
Component/s: system tests

> System test failing for new consumer on endOffsets with negative timestamps
> ---
>
> Key: KAFKA-16156
> URL: https://issues.apache.org/jira/browse/KAFKA-16156
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, system tests
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>
> TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid 
> negative timestamp".
> Trace:
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Received ListOffsetResponse 
> ListOffsetsResponseData(throttleTimeMs=0, 
> topics=[ListOffsetsTopicResponse(name='input-topic', 
> partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, 
> oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from 
> broker worker2:9092 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,932] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Handling ListOffsetResponse 
> response for input-topic-0. Fetched offset 42804, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Updating last stable offset for 
> partition input-topic-0 to 42804 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,933] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Fetch offsets completed 
> successfully for partitions and timestamps {input-topic-0=-1}. Result 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862
>  (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,933] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] No events to process 
> (org.apache.kafka.clients.consumer.internals.events.EventProcessor)
> [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event 
> loop (org.apache.kafka.tools.TransactionalMessageCopier)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
>   at 
> org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
>   at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)
> Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> 

[jira] [Updated] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16156:
--
Labels: consumer-threading-refactor kip-848-client-support system-tests  
(was: consumer-threading-refactor kip-848-client-support)

> System test failing for new consumer on endOffsets with negative timestamps
> ---
>
> Key: KAFKA-16156
> URL: https://issues.apache.org/jira/browse/KAFKA-16156
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>
> TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid 
> negative timestamp".
> Trace:
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Received ListOffsetResponse 
> ListOffsetsResponseData(throttleTimeMs=0, 
> topics=[ListOffsetsTopicResponse(name='input-topic', 
> partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, 
> oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from 
> broker worker2:9092 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,932] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Handling ListOffsetResponse 
> response for input-topic-0. Fetched offset 42804, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Updating last stable offset for 
> partition input-topic-0 to 42804 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,933] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Fetch offsets completed 
> successfully for partitions and timestamps {input-topic-0=-1}. Result 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862
>  (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,933] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] No events to process 
> (org.apache.kafka.clients.consumer.internals.events.EventProcessor)
> [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event 
> loop (org.apache.kafka.tools.TransactionalMessageCopier)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
>   at 
> org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
>   at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)
> Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> 

[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16115:
--
Labels: consumer-threading-refactor metrics  (was: 
consumer-threading-refactor)

> AsyncKafkaConsumer: Add missing heartbeat metrics
> -
>
> Key: KAFKA-16115
> URL: https://issues.apache.org/jira/browse/KAFKA-16115
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, metrics
> Fix For: 3.8.0
>
>
> The following metrics are missing:
> |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]|
> |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]|
> |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]|
> |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]|
> |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16112) Review JMX metrics in Async Consumer and determine the missing ones

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16112:
--
Labels: consumer-threading-refactor metrics  (was: 
consumer-threading-refactor)

> Review JMX metrics in Async Consumer and determine the missing ones
> ---
>
> Key: KAFKA-16112
> URL: https://issues.apache.org/jira/browse/KAFKA-16112
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, metrics
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, metrics
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16113:
--
Labels: consumer-threading-refactor metrics  (was: 
consumer-threading-refactor)

> AsyncKafkaConsumer: Add missing offset commit metrics
> -
>
> Key: KAFKA-16113
> URL: https://issues.apache.org/jira/browse/KAFKA-16113
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, metrics
> Fix For: 3.8.0
>
>
> The following metrics are missing from the AsyncKafkaConsumer:
> commit-latency-avg
> commit-latency-max
> commit-rate
> commit-total
> committed-time-ns-total



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16142) Update metrics documentation for errors and new metrics

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16142:
--
Labels: consumer-threading-refactor metrics  (was: 
consumer-threading-refactor)

> Update metrics documentation for errors and new metrics
> ---
>
> Key: KAFKA-16142
> URL: https://issues.apache.org/jira/browse/KAFKA-16142
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, documentation, metrics
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, metrics
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16116:
--
Labels: consumer-threading-refactor metrics  (was: 
consumer-threading-refactor)

> AsyncKafkaConsumer: Add missing rebalance metrics
> -
>
> Key: KAFKA-16116
> URL: https://issues.apache.org/jira/browse/KAFKA-16116
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, metrics
> Fix For: 3.8.0
>
>
> The following metrics are missing:
> |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]|
> |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]|
> |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]|
> |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]|
> |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]|
> |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]|
> |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16143) New metrics for KIP-848 protocol

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16143:
--
Labels: kip-848-client-support metrics  (was: kip-848-client-support)

> New metrics for KIP-848 protocol
> 
>
> Key: KAFKA-16143
> URL: https://issues.apache.org/jira/browse/KAFKA-16143
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, metrics
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support, metrics
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16109) Ensure system tests cover the "simple consumer + commit" use case

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16109:
--
Component/s: system tests

> Ensure system tests cover the "simple consumer + commit" use case
> -
>
> Key: KAFKA-16109
> URL: https://issues.apache.org/jira/browse/KAFKA-16109
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, system-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15557:
--
Labels: consumer-threading-refactor fetcher unit-tests  (was: 
consumer-threading-refactor fetcher)

> Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in 
> assignFromUserNoId
> ---
>
> Key: KAFKA-15557
> URL: https://issues.apache.org/jira/browse/KAFKA-15557
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, fetcher, unit-tests
> Fix For: 4.0.0
>
>
> The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods 
> named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to 
> perform duplicate metadata updates:
> {code:java}
> private void assignFromUser(Set partitions) {
> subscriptions.assignFromUser(partitions);
> client.updateMetadata(initialUpdateResponse);
> // A dummy metadata update to ensure valid leader epoch.
> metadata.updateWithCurrentRequestVersion(
> RequestTestUtils.metadataUpdateWithIds(
> "dummy",
> 1, 
> Collections.emptyMap(),
> singletonMap(topicName, 4),
> tp -> validLeaderEpoch, topicIds
> ),
> false,
> 0L
> );
> }
> {code}
> {{client.updateMetadata()}} eventually calls 
> {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is 
> updating the cluster metadata twice with different values.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15606:
--
Labels: consumer-threading-refactor fetcher unit-tests  (was: 
consumer-threading-refactor fetcher)

> Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
> -
>
> Key: KAFKA-15606
> URL: https://issues.apache.org/jira/browse/KAFKA-15606
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, fetcher, unit-tests
> Fix For: 4.0.0
>
>
> As part of the review for [FetchRequestManager pull 
> request|https://github.com/apache/kafka/pull/14406], [~junrao] had some 
> questions related to the correctness and clarity of the 
> {{FetcherTest.testCompletedFetchRemoval()}} test:
> Questions:
> * https://github.com/apache/kafka/pull/14406#discussion_r1347908197
> * https://github.com/apache/kafka/pull/14406#discussion_r1347910980
> * https://github.com/apache/kafka/pull/14406#discussion_r1347913781



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15635) Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15635:
--
Labels: consumer-threading-refactor fetcher unit-tests  (was: 
consumer-threading-refactor fetcher)

> Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric
> -
>
> Key: KAFKA-15635
> URL: https://issues.apache.org/jira/browse/KAFKA-15635
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, fetcher, unit-tests
> Fix For: 4.0.0
>
>
> Why is {{recordsFetchLeadMin}} different from {{partitionLead}} given there 
> is only 1 assigned partition?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16000) Migrate MembershipManagerImplTest away from ConsumerTestBuilder

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16000:
--
Labels: consumer-threading-refactor unit-tests  (was: 
consumer-threading-refactor)

> Migrate MembershipManagerImplTest away from ConsumerTestBuilder
> ---
>
> Key: KAFKA-16000
> URL: https://issues.apache.org/jira/browse/KAFKA-16000
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15637:
--
Labels: consumer-threading-refactor fetcher unit-tests  (was: 
consumer-threading-refactor fetcher)

> Investigate FetcherTest's/FetchRequestManager's 
> testFetchCompletedBeforeHandlerAdded
> 
>
> Key: KAFKA-15637
> URL: https://issues.apache.org/jira/browse/KAFKA-15637
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, fetcher, unit-tests
> Fix For: 4.0.0
>
>
> Thanks for the reply. I still don't quite understand the test. Why do we 
> duplicate the following code both inside and outside of {{{}setWakeupHook{}}}?
>  
> {code:java}
> networkClientDelegate.disconnectAsync(readReplica);
> networkClientDelegate.poll(time.timer(0));
> {code}
>  
> MockClient is only woken up through 
> {{{}networkClientDelegate.disconnectAsync{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15639:
--
Labels: consumer-threading-refactor unit-tests  (was: 
consumer-threading-refactor)

> Investigate ConsumerNetworkThreadTest's 
> testResetPositionsProcessFailureIsIgnored
> -
>
> Key: KAFKA-15639
> URL: https://issues.apache.org/jira/browse/KAFKA-15639
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.8.0
>
>
> The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:
>  
> {code:java}
> @Test
> public void testResetPositionsProcessFailureIsIgnored() {
> doThrow(new 
> NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
> ResetPositionsApplicationEvent event = new 
> ResetPositionsApplicationEvent();
> applicationEventsQueue.add(event);
> assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
> 
> verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
> }
>  {code}
>  
> [~junrao] asks:
>  
> {quote}Not sure if this is a useful test since 
> {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
> throw an exception?
> {quote}
>  
> I commented out the {{doThrow}} line and it did not impact the test. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15652) Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp()

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15652:
--
Labels: consumer-threading-refactor position unit-tests  (was: 
consumer-threading-refactor position)

> Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp()
> 
>
> Key: KAFKA-15652
> URL: https://issues.apache.org/jira/browse/KAFKA-15652
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, position, unit-tests
> Fix For: 3.8.0
>
>
> In the {{updateFetchPositions()}} method implementation, both 
> {{KafkaConsumer}} and {{PrototypeAsyncConsumer}} reset positions 
> asynchronously. [~junrao] stated the following in a [recent PR 
> review|https://github.com/apache/kafka/pull/14406#discussion_r1349173413]:
> {quote}There is a subtle difference between transitioning to reset from 
> initializing and transitioning to reset from {{OffsetOutOfRangeException}} 
> during fetch. In the latter, the application thread will call 
> {{{}FetchCollector.handleInitializeErrors(){}}}. If there is no default 
> offset reset policy, an {{OffsetOutOfRangeException}} will be thrown to the 
> application thread during {{{}poll{}}}, which is what we want.
> However, for the former, if there is no default offset reset policy, we 
> simply ignore that partition through 
> {{{}OffsetFetcherUtils.getOffsetResetTimestamp{}}}. It seems in that case, 
> the partition will be forever in the reset state and the application thread 
> won't get the {{{}OffsetOutOfRangeException{}}}.
> {quote}
> I intentionally changed the code so that no exceptions were thrown in 
> {{OffsetFetcherUtils.getOffsetResetTimestamp()}} and would simply return an 
> empty map. When I ran the unit tests and integration tests, there were no 
> failures, strongly suggesting that there is no coverage of this particular 
> edge case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15991:
--
Labels: consumer-threading-refactor flaky-test kip-848-client-support 
unit-tests  (was: consumer-threading-refactor flaky-test kip-848-client-support)

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, 
> kip-848-client-support, unit-tests
> Fix For: 3.7.0
>
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15999) Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15999:
--
Labels: consumer-threading-refactor unit-tests  (was: 
consumer-threading-refactor)

> Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder
> -
>
> Key: KAFKA-15999
> URL: https://issues.apache.org/jira/browse/KAFKA-15999
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15638:
--
Labels: consumer-threading-refactor unit-tests  (was: 
consumer-threading-refactor)

> Investigate ConsumerNetworkThreadTest's testPollResultTimer
> ---
>
> Key: KAFKA-15638
> URL: https://issues.apache.org/jira/browse/KAFKA-15638
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.8.0
>
>
> Regarding this comment in {{{}testPollResultTimer{}}}...
> {code:java}
> // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
> upon success|
> {code}
> [~junrao] asked:
> {quote}Which call is returning Long.MAX_VALUE?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16001:
--
Labels: consumer-threading-refactor unit-tests  (was: 
consumer-threading-refactor)

> Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
> ---
>
> Key: KAFKA-16001
> URL: https://issues.apache.org/jira/browse/KAFKA-16001
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15636:
--
Labels: consumer-threading-refactor fetcher unit-tests  (was: 
consumer-threading-refactor fetcher)

> Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
> 
>
> Key: KAFKA-15636
> URL: https://issues.apache.org/jira/browse/KAFKA-15636
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, fetcher, unit-tests
> Fix For: 4.0.0
>
>
> {{expectedBytes}} is calculated as total, instead of avg. Is this correct?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14724) Port tests in FetcherTest to FetchRequestManagerTest

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14724:
--
Labels: consumer-threading-refactor fetcher kip-848-e2e kip-848-preview 
unit-tests  (was: consumer-threading-refactor fetcher kip-848-e2e 
kip-848-preview)

> Port tests in FetcherTest to FetchRequestManagerTest
> 
>
> Key: KAFKA-14724
> URL: https://issues.apache.org/jira/browse/KAFKA-14724
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, fetcher, kip-848-e2e, 
> kip-848-preview, unit-tests
> Fix For: 3.7.0
>
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task involves copying the relevant tests from {{FetcherTest}} and 
> modifying them to fit a new unit test named {{{}FetchRequestManagerTest{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15634) Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15634:
--
Labels: consumer-threading-refactor fetcher unit-tests  (was: 
consumer-threading-refactor fetcher)

> Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics
> 
>
> Key: KAFKA-15634
> URL: https://issues.apache.org/jira/browse/KAFKA-15634
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, fetcher, unit-tests
> Fix For: 4.0.0
>
>
> What is the point of the code in the initial {{while}} loop since the receive 
> is delayed and thus there is no {{throttleDelayMs}} received in the client?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15617) Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions and testInflightFetchOnPendingPartitions overlap

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15617:
--
Labels: consumer-threading-refactor fetcher unit-tests  (was: 
consumer-threading-refactor fetcher)

> Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions 
> and testInflightFetchOnPendingPartitions overlap
> --
>
> Key: KAFKA-15617
> URL: https://issues.apache.org/jira/browse/KAFKA-15617
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, fetcher, unit-tests
> Fix For: 4.0.0
>
>
> In FetcherTest, the two tests testFetchingPendingPartitions and 
> testInflightFetchOnPendingPartitions have significant overlap. Perhaps the 
> former subsumes the latter?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15971) Re-enable consumer integration tests for new consumer

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15971:
--
Component/s: (was: unit tests)

> Re-enable consumer integration tests for new consumer
> -
>
> Key: KAFKA-15971
> URL: https://issues.apache.org/jira/browse/KAFKA-15971
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, kip-848, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> Re-enable the consumer integration tests for the new consumer making sure 
> that build stability is not impacted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15971) Re-enable consumer integration tests for new consumer

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15971:
--
Labels: consumer-threading-refactor integration-tests kip-848 
kip-848-preview  (was: consumer-threading-refactor kip-848 kip-848-preview 
system-tests)

> Re-enable consumer integration tests for new consumer
> -
>
> Key: KAFKA-15971
> URL: https://issues.apache.org/jira/browse/KAFKA-15971
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, kip-848, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> Re-enable the consumer integration tests for the new consumer making sure 
> that build stability is not impacted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15932) Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15932:
--
Labels: consumer-threading-refactor flaky-test integration-tests 
kip-848-client-support  (was: consumer-threading-refactor flaky-test 
integration-tests kip-848 kip-848-client-support)

> Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")
> ---
>
> Key: KAFKA-15932
> URL: https://issues.apache.org/jira/browse/KAFKA-15932
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, 
> integration-tests, kip-848-client-support
> Fix For: 3.7.0
>
>
> Intermittently failing test for the new consumer.
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/
> ```Error
> org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 
> records. The number consumed was 0.
> Stacktrace
> org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 
> records. The number consumed was 0.
>   at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161)
>   at 
> app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128)
>   at 
> app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>   at 
> 

[jira] [Updated] (KAFKA-15932) Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15932:
--
Labels: consumer-threading-refactor flaky-test integration-tests kip-848 
kip-848-client-support  (was: consumer-threading-refactor flaky-test kip-848 
kip-848-client-support system-tests)

> Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")
> ---
>
> Key: KAFKA-15932
> URL: https://issues.apache.org/jira/browse/KAFKA-15932
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, 
> integration-tests, kip-848, kip-848-client-support
> Fix For: 3.7.0
>
>
> Intermittently failing test for the new consumer.
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/
> ```Error
> org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 
> records. The number consumed was 0.
> Stacktrace
> org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 
> records. The number consumed was 0.
>   at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161)
>   at 
> app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128)
>   at 
> app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>   at 
> 

[jira] [Updated] (KAFKA-15932) Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15932:
--
Component/s: (was: unit tests)

> Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")
> ---
>
> Key: KAFKA-15932
> URL: https://issues.apache.org/jira/browse/KAFKA-15932
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, kip-848, 
> kip-848-client-support, system-tests
> Fix For: 3.7.0
>
>
> Intermittently failing test for the new consumer.
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/
> ```Error
> org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 
> records. The number consumed was 0.
> Stacktrace
> org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 
> records. The number consumed was 0.
>   at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161)
>   at 
> app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128)
>   at 
> app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> 

[jira] [Updated] (KAFKA-15986) New consumer group protocol integration test failures

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15986:
--
Component/s: (was: unit tests)

> New consumer group protocol integration test failures
> -
>
> Key: KAFKA-15986
> URL: https://issues.apache.org/jira/browse/KAFKA-15986
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, kip-848, 
> kip-848-client-support
> Fix For: 3.7.0
>
>
> A recent change in `AsyncKafkaConsumer.updateFetchPositions` has made 
> fetching fail without returning records in some situations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15991:
--
Labels: consumer-threading-refactor flaky-test kip-848-client-support  
(was: consumer-threading-refactor flaky-test kip-848 kip-848-client-support 
system-tests)

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, 
> kip-848-client-support
> Fix For: 3.7.0
>
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15515) Remove duplicated integration tests for new consumer

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15515:
--
Component/s: (was: unit tests)

> Remove duplicated integration tests for new consumer
> 
>
> Key: KAFKA-15515
> URL: https://issues.apache.org/jira/browse/KAFKA-15515
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, system-tests
> Fix For: 3.7.0
>
>
> This task involves removing the temporary `PlaintextAsyncConsumer` file 
> containing duplicated integration tests for the new consumer. The copy was 
> generated to catch regressions and validate functionality in the new consumer 
> while in development. It should be deleted when the new consumer is fully 
> implemented and the existing integration tests (`PlaintextConsumerTest`) can 
> be executed for both implementations.
>  
> Context:
>  
> For the current KafkaConsumer, a set of integration tests exist in the file 
> PlaintextConsumerTest. Those tests cannot be executed as such for the new 
> consumer implementation for 2 main reasons
> - the new consumer is being developed as a new PrototypeAsyncConsumer class, 
> in parallel to the existing KafkaConsumer. 
> - the new consumer is under development, so it does not support all the 
> consumer functionality yet. 
>  
> In order to be able to run the subsets of tests that the new consumer 
> supports while the implementation completes, it was decided to :  
>  - to make a copy of the `PlaintextAsyncConsumer` class, named 
> PlaintextAsyncConsumer.
> - leave all the existing integration tests that cover the simple consumer 
> case unchanged, and disable the tests that are not yet supported by the new 
> consumer. Disabled tests will be enabled as the async consumer
> evolves.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15986) New consumer group protocol integration test failures

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15986:
--
Labels: consumer-threading-refactor integration-tests kip-848 
kip-848-client-support  (was: consumer-threading-refactor kip-848 
kip-848-client-support system-tests)

> New consumer group protocol integration test failures
> -
>
> Key: KAFKA-15986
> URL: https://issues.apache.org/jira/browse/KAFKA-15986
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, kip-848, 
> kip-848-client-support
> Fix For: 3.7.0
>
>
> A recent change in `AsyncKafkaConsumer.updateFetchPositions` has made 
> fetching fail without returning records in some situations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15515) Remove duplicated integration tests for new consumer

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15515:
--
Labels: consumer-threading-refactor integration-tests  (was: 
consumer-threading-refactor system-tests)

> Remove duplicated integration tests for new consumer
> 
>
> Key: KAFKA-15515
> URL: https://issues.apache.org/jira/browse/KAFKA-15515
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests
> Fix For: 3.7.0
>
>
> This task involves removing the temporary `PlaintextAsyncConsumer` file 
> containing duplicated integration tests for the new consumer. The copy was 
> generated to catch regressions and validate functionality in the new consumer 
> while in development. It should be deleted when the new consumer is fully 
> implemented and the existing integration tests (`PlaintextConsumerTest`) can 
> be executed for both implementations.
>  
> Context:
>  
> For the current KafkaConsumer, a set of integration tests exist in the file 
> PlaintextConsumerTest. Those tests cannot be executed as such for the new 
> consumer implementation for 2 main reasons
> - the new consumer is being developed as a new PrototypeAsyncConsumer class, 
> in parallel to the existing KafkaConsumer. 
> - the new consumer is under development, so it does not support all the 
> consumer functionality yet. 
>  
> In order to be able to run the subsets of tests that the new consumer 
> supports while the implementation completes, it was decided to :  
>  - to make a copy of the `PlaintextAsyncConsumer` class, named 
> PlaintextAsyncConsumer.
> - leave all the existing integration tests that cover the simple consumer 
> case unchanged, and disable the tests that are not yet supported by the new 
> consumer. Disabled tests will be enabled as the async consumer
> evolves.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16167:
--
Labels: consumer-threading-refactor integration-tests 
kip-848-client-support  (was: consumer-threading-refactor 
kip-848-client-support system-tests)

> Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
> --
>
> Key: KAFKA-16167
> URL: https://issues.apache.org/jira/browse/KAFKA-16167
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16167:
--
Component/s: (was: unit tests)

> Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
> --
>
> Key: KAFKA-16167
> URL: https://issues.apache.org/jira/browse/KAFKA-16167
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16109) Ensure system tests cover the "simple consumer + commit" use case

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16109:
--
Component/s: (was: system tests)

> Ensure system tests cover the "simple consumer + commit" use case
> -
>
> Key: KAFKA-16109
> URL: https://issues.apache.org/jira/browse/KAFKA-16109
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, system-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16109) Ensure system tests cover the "simple consumer + commit" use case

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16109:
--
Labels: consumer-threading-refactor system-tests  (was: 
consumer-threading-refactor)

> Ensure system tests cover the "simple consumer + commit" use case
> -
>
> Key: KAFKA-16109
> URL: https://issues.apache.org/jira/browse/KAFKA-16109
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, system-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16104:
--
Component/s: (was: system tests)
 Labels: consumer-threading-refactor integration-tests  (was: 
consumer-threading-refactor)

> Enable additional PlaintextConsumerTest tests for new consumer
> --
>
> Key: KAFKA-16104
> URL: https://issues.apache.org/jira/browse/KAFKA-16104
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, integration-tests
> Fix For: 3.8.0
>
>
> It should be possible to enable:
>  * testAutoCommitOnClose
>  * testAutoCommitOnCloseAfterWakeup
>  * testExpandingTopicSubscriptions
>  * testShrinkingTopicSubscriptions
>  * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed)
>  * testMultiConsumerSessionTimeoutOnStopPolling
>  * testAutoCommitOnRebalance
>  * testPerPartitionLeadMetricsCleanUpWithSubscribe
>  * testPerPartitionLagMetricsCleanUpWithSubscribe
>  * testStaticConsumerDetectsNewPartitionCreatedAfterRestart



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15993) Enable max poll integration tests that depend on callback invocation

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15993:
--
Component/s: (was: system tests)
 Labels: consumer-threading-refactor integration-tests kip-848-preview 
timeout  (was: consumer-threading-refactor kip-848-preview timeout)

> Enable max poll integration tests that depend on callback invocation
> 
>
> Key: KAFKA-15993
> URL: https://issues.apache.org/jira/browse/KAFKA-15993
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-preview, timeout
> Fix For: 3.8.0
>
>
> We will enable integration tests using the async consumer in KAFKA-15971.  
> However, we should also enable tests that rely on rebalance listeners after 
> KAFKA-15628 is closed.  One example would be testMaxPollIntervalMs, that I 
> relies on the listener to verify the correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
--
Component/s: (was: system tests)
 Labels: consumer-threading-refactor integration-tests timeout  (was: 
consumer-threading-refactor timeout)

> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --
>
> Key: KAFKA-15848
> URL: https://issues.apache.org/jira/browse/KAFKA-15848
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
> {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their 
> use and interpretation of the {{Timer}} that is supplied.
> h3. tl;dr
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
> success of its operations _before_ checking the timer:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{NetworkClient.poll()}}
>  # Check for result
>  ## If successful, return success
>  ## If fatal failure, return failure
>  # Check timer
>  ## If timer expired, return failure
> {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{Future.get()}}
>  ## If operation timed out, {{Future.get()}} will throw a timeout error
>  # Check for result
>  ## If successful, return success
>  ## Otherwise, return failure
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via any 
> of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} 
> API. Here's a bit of code that illustrates the difference between the two 
> approaches.
> {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
> manner similar to this:
> {code:java}
> public int getCount(Timer timer) {
> do {
> final RequestFuture future = sendSomeRequest(partitions);
> client.poll(future, timer);
> if (future.isDone())
> return future.get();
> } while (timer.notExpired());
> return -1;
> }
> {code}
> {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:java}
> private int getCount(Timer timer) {
> try {
> CompletableFuture future = new CompleteableFuture<>();
> applicationEventQueue.add(future);
> return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
> } catch (TimeoutException e) {
> return -1;
> }
> }
> {code}
> The call to {{add}} enqueues the network operation, but it then _immediately_ 
> invokes {{Future.get()}} with the timeout to implement a time-bounded 
> blocking call. Since this method is being called with a timeout of 0, it 
> _immediately_ throws a {{{}TimeoutException{}}}. 
> h3. Suggested fix
> TBD :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16023:
--
Component/s: (was: system tests)
 Labels: consumer-threading-refactor integration-tests timeout  (was: 
consumer-threading-refactor timeout)

> PlaintextConsumerTest needs to wait for reconciliation to complete before 
> proceeding
> 
>
> Key: KAFKA-16023
> URL: https://issues.apache.org/jira/browse/KAFKA-16023
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> Several tests in PlaintextConsumerTest.scala (such as 
> testPerPartitionLagMetricsCleanUpWithSubscribe) uses:
> assertEquals(1, listener.callsToAssigned, "should be assigned once")
> However, as the timing for reconciliation completion is not deterministic due 
> to asynchronous processing. We actually need to wait until the condition to 
> happen.
> However, another issue is the timeout - some of these tasks might not 
> complete within the 600ms timeout, so the tests are deemed to be flaky.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invoke and verify the consumer callback

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16019:
--
Component/s: (was: system tests)
 Labels: consumer-threading-refactor integration-tests timeout  (was: 
consumer-threading-refactor timeout)

> Some of the tests in PlaintextConsumer can't seem to deterministically invoke 
> and verify the consumer callback
> --
>
> Key: KAFKA-16019
> URL: https://issues.apache.org/jira/browse/KAFKA-16019
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> I was running the PlaintextConsumer to test the async consumer; however, a 
> few tests were failing with not being able to verify the listener is invoked 
> correctly
> For example `testPerPartitionLeadMetricsCleanUpWithSubscribe`
> Around 50% of the time, the listener's callsToAssigned was never incremented 
> correctly.  Event changing it to awaitUntilTrue it was still the same case
> {code:java}
> consumer.subscribe(List(topic, topic2).asJava, listener)
> val records = awaitNonEmptyRecords(consumer, tp)
> assertEquals(1, listener.callsToAssigned, "should be assigned once") {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16010:
--
Component/s: (was: system tests)
 Labels: consumer-threading-refactor integration-tests timeout  (was: 
consumer-threading-refactor timeout)

> Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
> --
>
> Key: KAFKA-16010
> URL: https://issues.apache.org/jira/browse/KAFKA-16010
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Did not get valid assignment for 
> partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, 
> topic1-0, topic1-3] after one consumer left
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286)
>   at 
> kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883)
>   at 
> kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281)
> {code}
> The logs include these lines:
>  
> {code}
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16009:
--
Component/s: (was: system tests)
 Labels: consumer-threading-refactor integration-tests timeout  (was: 
consumer-threading-refactor timeout)

> Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
> 
>
> Key: KAFKA-16009
> URL: https://issues.apache.org/jira/browse/KAFKA-16009
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
>   at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16008:
--
Component/s: (was: system tests)
 Labels: consumer-threading-refactor integration-tests timeout  (was: 
consumer-threading-refactor timeout)

> Fix PlaintextConsumerTest.testMaxPollIntervalMs
> ---
>
> Key: KAFKA-16008
> URL: https://issues.apache.org/jira/browse/KAFKA-16008
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>     at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
> at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
> at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16134:
--
Labels: consumer-threading-refactor integration-tests 
kip-848-client-support  (was: consumer-threading-refactor 
kip-848-client-support)

> kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer is flaky
> --
>
> Key: KAFKA-16134
> URL: https://issues.apache.org/jira/browse/KAFKA-16134
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Stanislav Kozlovski
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> The following test is very flaky. It failed 3 times consecutively in Jenkins 
> runs for the 3.7 release candidate.
> kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16134:
--
Component/s: (was: system tests)

> kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer is flaky
> --
>
> Key: KAFKA-16134
> URL: https://issues.apache.org/jira/browse/KAFKA-16134
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Stanislav Kozlovski
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The following test is very flaky. It failed 3 times consecutively in Jenkins 
> runs for the 3.7 release candidate.
> kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16151:
--
Labels: consumer-threading-refactor integration-tests 
kip-848-client-support  (was: consumer-threading-refactor 
kip-848-client-support)

> Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
> -
>
> Key: KAFKA-16151
> URL: https://issues.apache.org/jira/browse/KAFKA-16151
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16011) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16011:
--
Labels: integration-tests kip-848-client-support timeout  (was: 
kip-848-client-support timeout)

> Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose
> 
>
> Key: KAFKA-16011
> URL: https://issues.apache.org/jira/browse/KAFKA-16011
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: integration-tests, kip-848-client-support, timeout
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Did not get valid assignment for 
> partitions HashSet(topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, 
> topic1-0, topic1-3). Instead, got ArrayBuffer(Set(topic1-0, topic-0, 
> topic-1), Set(), Set(topic1-1, topic1-5))
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286)
>   at 
> kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1865)
>   at 
> kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose(PlaintextConsumerTest.scala:1277)
> {code}
> The logs include these lines:
>  
> {code}
> [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:376)
> [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Member JQ_e0S5FTzKnYyStB3aBrQ with epoch 0 transitioned to 
> FATAL state 
> (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:456)
> [2023-12-13 15:33:33,212] ERROR [daemon-consumer-assignment]: Error due to 
> (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:139)
> org.apache.kafka.common.errors.InvalidRequestException: RebalanceTimeoutMs 
> must be provided in first request.
> [2023-12-13 15:33:39,196] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:33:39,200] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16150) Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16150:
--
Labels: consumer-threading-refactor integration-tests 
kip-848-client-support  (was: consumer-threading-refactor 
kip-848-client-support system-tests)

> Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe
> 
>
> Key: KAFKA-16150
> URL: https://issues.apache.org/jira/browse/KAFKA-16150
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16135) kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16135:
--
Labels: consumer-threading-refactor integration-tests 
kip-848-client-support  (was: consumer-threading-refactor 
kip-848-client-support)

> kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer is flaky
> ---
>
> Key: KAFKA-16135
> URL: https://issues.apache.org/jira/browse/KAFKA-16135
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Stanislav Kozlovski
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> The test
> kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer
> is incredibly flaky - it failed 3 builds in a row for the 3.7 release 
> candidate, but with different JDK versions. Locally it also fails often and 
> requires a few retries to pass
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16150) Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16150:
--
Component/s: (was: system tests)

> Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe
> 
>
> Key: KAFKA-16150
> URL: https://issues.apache.org/jira/browse/KAFKA-16150
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16011) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16011:
--
Component/s: (was: system tests)

> Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose
> 
>
> Key: KAFKA-16011
> URL: https://issues.apache.org/jira/browse/KAFKA-16011
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: integration-tests, kip-848-client-support, timeout
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Did not get valid assignment for 
> partitions HashSet(topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, 
> topic1-0, topic1-3). Instead, got ArrayBuffer(Set(topic1-0, topic-0, 
> topic-1), Set(), Set(topic1-1, topic1-5))
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286)
>   at 
> kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1865)
>   at 
> kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose(PlaintextConsumerTest.scala:1277)
> {code}
> The logs include these lines:
>  
> {code}
> [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:376)
> [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Member JQ_e0S5FTzKnYyStB3aBrQ with epoch 0 transitioned to 
> FATAL state 
> (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:456)
> [2023-12-13 15:33:33,212] ERROR [daemon-consumer-assignment]: Error due to 
> (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:139)
> org.apache.kafka.common.errors.InvalidRequestException: RebalanceTimeoutMs 
> must be provided in first request.
> [2023-12-13 15:33:39,196] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:33:39,200] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16135) kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16135:
--
Component/s: (was: system tests)

> kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer is flaky
> ---
>
> Key: KAFKA-16135
> URL: https://issues.apache.org/jira/browse/KAFKA-16135
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Stanislav Kozlovski
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> The test
> kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer
> is incredibly flaky - it failed 3 builds in a row for the 3.7 release 
> candidate, but with different JDK versions. Locally it also fails often and 
> requires a few retries to pass
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16152) Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16152:
--
Component/s: (was: system tests)

> Fix 
> PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart
> --
>
> Key: KAFKA-16152
> URL: https://issues.apache.org/jira/browse/KAFKA-16152
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16151:
--
Component/s: (was: system tests)

> Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
> -
>
> Key: KAFKA-16151
> URL: https://issues.apache.org/jira/browse/KAFKA-16151
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16037) Upgrade existing system tests to use new consumer

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16037:
--
Labels: kip-848-client-support system-tests  (was: kip-848-client-support)

> Upgrade existing system tests to use new consumer
> -
>
> Key: KAFKA-16037
> URL: https://issues.apache.org/jira/browse/KAFKA-16037
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Assignee: Dongnuo Lyu
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to parameterize the tests to run twice: both for the old and the 
> new Consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16152) Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16152:
--
Labels: consumer-threading-refactor integration-tests 
kip-848-client-support  (was: consumer-threading-refactor 
kip-848-client-support)

> Fix 
> PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart
> --
>
> Key: KAFKA-16152
> URL: https://issues.apache.org/jira/browse/KAFKA-16152
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15691) Add new system tests to use new consumer

2024-01-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15691:
--
Labels: kip-848-client-support system-tests  (was: kip-848-client-support)

> Add new system tests to use new consumer
> 
>
> Key: KAFKA-15691
> URL: https://issues.apache.org/jira/browse/KAFKA-15691
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [WIP] KAFKA-16192: Introduce usage of flexible records to coordinators [kafka]

2024-01-31 Thread via GitHub


jolshan opened a new pull request, #15303:
URL: https://github.com/apache/kafka/pull/15303

   Playing around with adding a transaction version feature
   
   WIP
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]

2024-01-31 Thread via GitHub


showuon commented on PR #15273:
URL: https://github.com/apache/kafka/pull/15273#issuecomment-1920457353

   > @showuon I rebased and addressed your comments. Did not remove properties 
definitions from KafkaConfig, because this breaks a lot of tests. I think this 
should be done as part of KafkaConfig migration. In the end it is a small 
amount of duplication, but properties are always defined in one place.
   
   Thanks @fvaleri ! I had another look, and still think the duplication is not 
a great idea. It might cause potential issue someday, even if it looks good 
now. From what I can see, almost all the config validation exists in 
KafkaConfig, so could we remove the config definition in LogConfig? My 
imagination is the LogConfig can do something similar as RaftConfig, that has 
no config definition there. If you want to have a copy of the config value, you 
could pass in KafkaConfig instance into LogConfig 
[here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1513).
 Do you think that make sense? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-31 Thread via GitHub


gaurav-narula commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1473758133


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4912,7 +4919,9 @@ class ReplicaManagerTest {
   assertTrue(fooPart eq fooPart2)
   val bar1 = new TopicPartition("bar", 1)
   replicaManager.markPartitionOffline(bar1)
-  assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, 
BAR_UUID))
+  val (barPart, barNew) = replicaManager.getOrCreatePartition(bar1, 
emptyDelta, BAR_UUID).get
+  assertTrue(barNew)
+  assertEquals(bar1, barPart.topicPartition)

Review Comment:
   Addressed in 
[cdf9c0f](https://github.com/apache/kafka/pull/15263/commits/cdf9c0f0cf817e68ca27092addb21308a9bc3762)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-909 [kafka]

2024-01-31 Thread via GitHub


github-actions[bot] commented on PR #14691:
URL: https://github.com/apache/kafka/pull/14691#issuecomment-1920440845

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16214) No user info when SASL authentication failure

2024-01-31 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16214:
-

 Summary: No user info when SASL authentication failure
 Key: KAFKA-16214
 URL: https://issues.apache.org/jira/browse/KAFKA-16214
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.6.0
Reporter: Luke Chen
Assignee: Luke Chen


When client authenticate failed, the server will log with the client IP address 
only. The the IP address sometimes cannot represent a specific user, especially 
if there is proxy between client and server. Ex:


{code:java}
INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with 
/127.0.0.1 (channelId=127.0.0.1:9093-127.0.0.1:53223-5) (Authentication failed: 
Invalid username or password) (org.apache.kafka.common.network.Selector)
{code}


If there are many failed authentication log appeared in the server, it'd be 
better to identify who is triggering it soon. Adding the client info to the log 
is a good start. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-31 Thread via GitHub


showuon commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1473746536


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4912,7 +4919,9 @@ class ReplicaManagerTest {
   assertTrue(fooPart eq fooPart2)
   val bar1 = new TopicPartition("bar", 1)
   replicaManager.markPartitionOffline(bar1)
-  assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, 
BAR_UUID))
+  val (barPart, barNew) = replicaManager.getOrCreatePartition(bar1, 
emptyDelta, BAR_UUID).get
+  assertTrue(barNew)
+  assertEquals(bar1, barPart.topicPartition)

Review Comment:
   Could we add a test case for returning `None`?
   Since after this PR, we'll have 2 cases when creating partition in 
OfflinePartition, we should test them both.



##
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##
@@ -3805,4 +3805,281 @@ class PartitionTest extends AbstractPartitionTest {
   
when(kRaftMetadataCache.getAliveBrokerEpoch(broker)).thenReturn(Option(defaultBrokerEpoch(broker)))
 }
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(false, true))
+  def makeLeaderInvokesgetOrCreateLog_OnOnlineLogDir(isNew: Boolean): Unit = {

Review Comment:
   Thanks for adding the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16180: Fix UMR and LAIR handling during ZK migration [kafka]

2024-01-31 Thread via GitHub


mumrah commented on code in PR #15293:
URL: https://github.com/apache/kafka/pull/15293#discussion_r1473671017


##
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##
@@ -65,48 +65,84 @@ case class MetadataSnapshot(partitionStates: 
mutable.AnyRefMap[String, mutable.L
 }
 
 object ZkMetadataCache {
-  /**
-   * Create topic deletions (leader=-2) for topics that are missing in a FULL 
UpdateMetadataRequest coming from a
-   * KRaft controller during a ZK migration. This will modify the 
UpdateMetadataRequest object passed into this method.
-   */
-  def maybeInjectDeletedPartitionsFromFullMetadataRequest(
+  def transformKRaftControllerFullMetadataRequest(
 currentMetadata: MetadataSnapshot,
 requestControllerEpoch: Int,
 requestTopicStates: util.List[UpdateMetadataTopicState],
-  ): Seq[Uuid] = {
-val prevTopicIds = currentMetadata.topicIds.values.toSet
-val requestTopics = requestTopicStates.asScala.map { topicState =>
-  topicState.topicName() -> topicState.topicId()
-}.toMap
-
-val deleteTopics = prevTopicIds -- requestTopics.values.toSet
-if (deleteTopics.isEmpty) {
-  return Seq.empty
+  ): (util.List[UpdateMetadataTopicState], util.List[String]) = {
+val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]()
+requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(), 
state))
+val logMessages = new util.ArrayList[String]
+val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]()
+currentMetadata.topicNames.forKeyValue((id, name) => {
+  Option(topicIdToNewState.get(id)) match {
+case None =>
+  currentMetadata.partitionStates.get(name) match {
+case None => logMessages.add(s"Error: topic ${name} appeared in 
currentMetadata.topicNames, " +
+  "but not in currentMetadata.partitionStates.")
+case Some(oldPartitionStates) =>
+  logMessages.add(s"Removing topic ${name} with ID ${id} from the 
metadata cache since " +
+"the full UMR did not include it.")
+  newRequestTopicStates.add(createDeletionEntries(name,
+id,
+oldPartitionStates.values,
+requestControllerEpoch))
+  }
+case Some(newTopicState) =>
+  val indexToState = new util.HashMap[Integer, 
UpdateMetadataPartitionState]
+  newTopicState.partitionStates().forEach(part => 
indexToState.put(part.partitionIndex, part))
+  currentMetadata.partitionStates.get(name) match {
+case None => logMessages.add(s"Error: topic ${name} appeared in 
currentMetadata.topicNames, " +
+  "but not in currentMetadata.partitionStates.")
+case Some(oldPartitionStates) =>
+  oldPartitionStates.foreach(state => 
indexToState.remove(state._1.toInt))
+  if (!indexToState.isEmpty) {
+logMessages.add(s"Removing ${indexToState.size()} partition(s) 
from topic ${name} with " +
+  s"ID ${id} from the metadata cache since the full UMR did 
not include them.")
+newRequestTopicStates.add(createDeletionEntries(name,
+  id,
+  indexToState.values().asScala,
+  requestControllerEpoch))
+  }
+  }
+  }
+})
+if (newRequestTopicStates.isEmpty) {
+  // If the output is the same as the input, optimize by just returning 
the input.
+  (requestTopicStates, logMessages)
+} else {
+  // If the output has some new entries, they should all appear at the 
beginning. This will
+  // ensure that the old stuff is cleared out before the new stuff is 
added. We will need a
+  // new list for this, of course.
+  newRequestTopicStates.addAll(requestTopicStates)
+  (newRequestTopicStates, logMessages)
 }
+  }
 
-deleteTopics.foreach { deletedTopicId =>
-  val topicName = currentMetadata.topicNames(deletedTopicId)

Review Comment:
   I believe this was the source of the NoSuchElementException reported in the 
JIRA, is that right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12549) Allow state stores to opt-in transactional support

2024-01-31 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812993#comment-17812993
 ] 

Matthias J. Sax commented on KAFKA-12549:
-

[~high.lee] – this ticket is superceded by 
https://issues.apache.org/jira/browse/KAFKA-14412 which is already WIP.

> Allow state stores to opt-in transactional support
> --
>
> Key: KAFKA-12549
> URL: https://issues.apache.org/jira/browse/KAFKA-12549
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Right now Kafka Stream's EOS implementation does not make any assumptions 
> about the state store's transactional support. Allowing the state stores to 
> optionally provide transactional support can have multiple benefits. E.g., if 
> we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, 
> {{commitTxn}} and {{abortTxn}}. Streams library can determine if these are 
> supported via an additional {{boolean transactional()}} API, and if yes the 
> these APIs can be used under both ALOS and EOS like the following (otherwise 
> then just fallback to the normal processing logic):
> Within thread processing loops:
> 1. store.beginTxn
> 2. store.put // during processing
> 3. streams commit // either through eos protocol or not
> 4. store.commitTxn
> 5. start the next txn by store.beginTxn
> If the state stores allow Streams to do something like above, we can have the 
> following benefits:
> * Reduce the duplicated records upon crashes for ALOS (note this is not EOS 
> still, but some middle-ground where uncommitted data within a state store 
> would not be retained if store.commitTxn failed).
> * No need to wipe the state store and re-bootstrap from scratch upon crashes 
> for EOS. E.g., if a crash-failure happened between streams commit completes 
> and store.commitTxn. We can instead just roll-forward the transaction by 
> replaying the changelog from the second recent streams committed offset 
> towards the most recent committed offset.
> * Remote stores that support txn then do not need to support wiping 
> (https://issues.apache.org/jira/browse/KAFKA-12475).
> * We can fix the known issues of emit-on-change 
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams).
> * We can support "query committed data only" for interactive queries (see 
> below for reasons).
> As for the implementation of these APIs, there are several options:
> * The state store itself have natural transaction features (e.g. RocksDB).
> * Use an in-memory buffer for all puts within a transaction, and upon 
> `commitTxn` write the whole buffer as a batch to the underlying state store, 
> or just drop the whole buffer upon aborting. Then for interactive queries, 
> one can optionally only query the underlying store for committed data only.
> * Use a separate store as the transient persistent buffer. Upon `beginTxn` 
> create a new empty transient store, and upon `commitTxn` merge the store into 
> the underlying store. Same applies for interactive querying committed-only 
> data. This has a benefit compared with the one above that there's no memory 
> pressure even with long transactions, but incurs more complexity / 
> performance overhead with the separate persistent store.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14683 Migrate #testStartPaused to Mockito [kafka]

2024-01-31 Thread via GitHub


gharris1727 commented on code in PR #14663:
URL: https://github.com/apache/kafka/pull/14663#discussion_r1473603606


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##
@@ -1994,53 +1616,6 @@ private void expectInitializeTask() {
 PowerMock.expectLastCall();
 }
 
-private void expectRebalanceLossError(RuntimeException e) {
-sinkTask.close(new HashSet<>(INITIAL_ASSIGNMENT));
-EasyMock.expectLastCall().andThrow(e);
-
-
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong(.andAnswer(
-() -> {
-
rebalanceListener.getValue().onPartitionsLost(INITIAL_ASSIGNMENT);
-return ConsumerRecords.empty();
-});
-}
-
-private void expectRebalanceRevocationError(RuntimeException e) {
-sinkTask.close(new HashSet<>(INITIAL_ASSIGNMENT));
-EasyMock.expectLastCall().andThrow(e);
-
-sinkTask.preCommit(EasyMock.anyObject());
-EasyMock.expectLastCall().andReturn(Collections.emptyMap());
-
-
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong(.andAnswer(
-() -> {
-
rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
-return ConsumerRecords.empty();
-});
-}
-
-private void expectRebalanceAssignmentError(RuntimeException e) {
-sinkTask.close(INITIAL_ASSIGNMENT);
-EasyMock.expectLastCall();
-
-sinkTask.preCommit(EasyMock.anyObject());
-EasyMock.expectLastCall().andReturn(Collections.emptyMap());
-
-
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-
-sinkTask.open(INITIAL_ASSIGNMENT);
-EasyMock.expectLastCall().andThrow(e);
-
-
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
-
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong(.andAnswer(
-() -> {
-
rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
-
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
-return ConsumerRecords.empty();
-});
-}
-
 private void expectPollInitialAssignment() {

Review Comment:
   I can't put the comment on the right line, but the next function 
`expectConsumerWakeup()` in this file is now unused.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
+import 

Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]

2024-01-31 Thread via GitHub


mjsax commented on code in PR #14360:
URL: https://github.com/apache/kafka/pull/14360#discussion_r1473438104


##
docs/streams/developer-guide/config-streams.html:
##
@@ -1010,6 +1016,18 @@ topology.optimization
+  windowed.inner.class.serde
+  
+
+  
+Serde for the inner class of a windowed record. Must implement the 
org.apache.kafka.common.serialization.Serde interface.
+  
+  
+Note that setting this config in KafkaStreams application would 
result in an error as it is meant to be used only from Plain consumer client.

Review Comment:
   Thanks Sophie.
   
   > Note that this config is only used by plain consumer/producer clients 
   
   For this case, why are we documenting is in KS docs -- should it not be in 
clients docs? (Also, this applies to `window.size.ms` introduced in 2.8 via 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size,
 right, but not to `windowed.inner.serde.class` which is a KS config added in 
3.0 via 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177047930
   
   In the end, KS module provides the window-serdes and thus it does not make 
sense to add `window.size.ms` to `ClientConfig` -- especially, as you pointed 
out, because is only necessary for console consumer.
   
   Thus, while `StreamsConfig#WINDOW_SIZE_MS_CONFIG` must exist as a variable 
name, I am wondering if it's actually correct that we added it as a 
StreamsConfig, ie, via `define(...)`? Mabye we should do a small KIP and remove 
it? -- For use, we should not mention `window.size.ms` in KS docs on the 
web-page (at least not for "top level config" -- we should either add it to a 
"windowed serde" section, or to (console) consumer config section where it 
belong to)?
   
   `window.inner.serde.class` is a KS config and should just be documented in 
the regular way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]

2024-01-31 Thread via GitHub


mjsax commented on code in PR #14360:
URL: https://github.com/apache/kafka/pull/14360#discussion_r1473419962


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -482,7 +482,8 @@ public class StreamsConfig extends AbstractConfig {
 public static final String BUILT_IN_METRICS_VERSION_CONFIG = 
"built.in.metrics.version";
 private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the 
built-in metrics to use.";
 
-/** {@code cache.max.bytes.buffering} */
+/** {@code cache.max.bytes.buffering}
+ * @deprecated since 3.4.0 Use cache.max.bytes instead with the cache.size 
metric at the DEBUG level. */

Review Comment:
   ```suggestion
* @deprecated since 3.4.0 Use {@link #CACHE_MAX_BYTES_CONFIG 
"cache.max.bytes"} instead. */
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -537,14 +538,16 @@ public class StreamsConfig extends AbstractConfig {
 public static final String ROCKS_DB = "rocksDB";
 public static final String IN_MEMORY = "in_memory";
 
-/** {@code default.windowed.key.serde.inner} */
+/** {@code default.windowed.key.serde.inner
+ * @deprecated since 3.0.0 Use windowed.inner.class.serde instead.} */

Review Comment:
   ```suggestion
* @deprecated since 3.0.0 Use {@link #WINDOWED_INNER_CLASS_SERDE 
"windowed.inner.class.serde"} instead.} */
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -482,7 +482,8 @@ public class StreamsConfig extends AbstractConfig {
 public static final String BUILT_IN_METRICS_VERSION_CONFIG = 
"built.in.metrics.version";
 private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the 
built-in metrics to use.";
 
-/** {@code cache.max.bytes.buffering} */
+/** {@code cache.max.bytes.buffering}
+ * @deprecated since 3.4.0 Use cache.max.bytes instead with the cache.size 
metric at the DEBUG level. */

Review Comment:
   Not sure if I understand the reference to `cache.size` metric? Can you 
elaborate?



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -537,14 +538,16 @@ public class StreamsConfig extends AbstractConfig {
 public static final String ROCKS_DB = "rocksDB";
 public static final String IN_MEMORY = "in_memory";
 
-/** {@code default.windowed.key.serde.inner} */
+/** {@code default.windowed.key.serde.inner
+ * @deprecated since 3.0.0 Use windowed.inner.class.serde instead.} */
 @SuppressWarnings("WeakerAccess")
 @Deprecated
 public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = 
"default.windowed.key.serde.inner";
 private static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC = 
"Default serializer / deserializer for the inner class of a windowed key. Must 
implement the " +
 "org.apache.kafka.common.serialization.Serde interface.";
 
-/** {@code default.windowed.value.serde.inner} */
+/** {@code default.windowed.value.serde.inner
+ * @deprecated since 3.0.0 Use windowed.inner.class.serde instead.} */

Review Comment:
   as above



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -620,7 +623,8 @@ public class StreamsConfig extends AbstractConfig {
 @SuppressWarnings("WeakerAccess")
 public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = 
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
 
-/** {@code auto.include.jmx.reporter} */
+/** {@code auto.include.jmx.reporter
+ * @deprecated and will removed in 4.0.0 Use 
org.apache.kafka.common.metrics.JmxReporter in metric.reporters instead.} */

Review Comment:
   as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]

2024-01-31 Thread via GitHub


philipnee commented on code in PR #15216:
URL: https://github.com/apache/kafka/pull/15216#discussion_r1473330427


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
+
+/**
+ * Base class for different consumer metrics to extend. This class helps to 
construct the logical group name from the
+ * given prefix and suffix, and provides a few common utilities.
+ *
+ * 
+ * The suffix can be one of the following:
+ * 
+ * -coordinator-metrics: {@link 
MetricGroupSuffix#COORDINATOR}
+ * -metrics: {@link MetricGroupSuffix#CONSUMER}
+ * 
+ * 
+ */
+public abstract class AbstractConsumerMetricsManager {

Review Comment:
   See the latest commit - i removed this class to simplify the implementation 
a bit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16189; Extend admin to support ConsumerGroupDescribe API [kafka]

2024-01-31 Thread via GitHub


dajac commented on code in PR #15253:
URL: https://github.com/apache/kafka/pull/15253#discussion_r1473247269


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##
@@ -158,36 +295,71 @@ public ApiResult handleResponse(
 return new ApiResult<>(completed, failed, new 
ArrayList<>(groupsToUnmap));
 }
 
+private Set 
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+return assignment.topicPartitions().stream().flatMap(topic ->
+topic.partitions().stream().map(partition ->
+new TopicPartition(topic.topicName(), partition)
+)
+).collect(Collectors.toSet());
+}
+
 private void handleError(
 CoordinatorKey groupId,
 Errors error,
+String errorMsg,
 Map failed,
-Set groupsToUnmap
+Set groupsToUnmap,
+boolean isConsumerGroupResponse
 ) {
+String apiName = isConsumerGroupResponse ? "ConsumerGroupDescribe" : 
"DescribeGroups";
+
 switch (error) {
 case GROUP_AUTHORIZATION_FAILED:
-log.debug("`DescribeGroups` request for group id {} failed due 
to error {}", groupId.idValue, error);
-failed.put(groupId, error.exception());
+log.debug("`{}` request for group id {} failed due to error 
{}.", apiName, groupId.idValue, error);
+failed.put(groupId, error.exception(errorMsg));
 break;
 
 case COORDINATOR_LOAD_IN_PROGRESS:
 // If the coordinator is in the middle of loading, then we 
just need to retry
-log.debug("`DescribeGroups` request for group id {} failed 
because the coordinator " +
-"is still in the process of loading state. Will retry", 
groupId.idValue);
+log.debug("`{}` request for group id {} failed because the 
coordinator " +
+"is still in the process of loading state. Will retry.", 
apiName, groupId.idValue);
 break;
 
 case COORDINATOR_NOT_AVAILABLE:
 case NOT_COORDINATOR:
 // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
 // the key so that we retry the `FindCoordinator` request
-log.debug("`DescribeGroups` request for group id {} returned 
error {}. " +
-"Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+log.debug("`{}` request for group id {} returned error {}. " +
+"Will attempt to find the coordinator again and retry.", 
apiName, groupId.idValue, error);
 groupsToUnmap.add(groupId);
 break;
 
+case UNSUPPORTED_VERSION:
+if (isConsumerGroupResponse) {
+log.debug("`{}` request for group id {} failed because the 
API is not " +
+"supported. Will retry with `DescribeGroups` API.", 
apiName, groupId.idValue);
+useClassicGroupApi.add(groupId.idValue);
+} else {
+log.error("`{}` request for group id {} because the 
`ConsumerGroupDescribe` API is not supported.",
+apiName, groupId.idValue);
+failed.put(groupId, error.exception(errorMsg));
+}
+break;
+
+case GROUP_ID_NOT_FOUND:
+if (isConsumerGroupResponse) {
+log.debug("`{}` request for group id {} failed because the 
group is not " +
+"a new consumer group. Will retry with 
`DescribeGroups` API.", apiName, groupId.idValue);
+useClassicGroupApi.add(groupId.idValue);
+} else {
+log.error("`{}` request for group id {} because the group 
does not exist.", apiName, groupId.idValue);

Review Comment:
   Good catch! Let me fix this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change Connect integration StopStartLatch::await to throw TimeoutException on timeout [kafka]

2024-01-31 Thread via GitHub


C0urante closed pull request #15178: MINOR: Change Connect integration 
StopStartLatch::await to throw TimeoutException on timeout
URL: https://github.com/apache/kafka/pull/15178


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change Connect integration StopStartLatch::await to throw TimeoutException on timeout [kafka]

2024-01-31 Thread via GitHub


C0urante commented on PR #15178:
URL: https://github.com/apache/kafka/pull/15178#issuecomment-1919630582

   This change wasn't really that valuable; the only information it added was 
already trivial and what one would expect to see (e.g., testing code that 
waited for a connector to be stopped once would report that it was still 
waiting for the connector to be stopped once--not exactly jaw-dropping insight).
   
   I've instead begun debugging our tests with full logs enabled on 
https://github.com/apache/kafka/pull/15229, which has proven much more useful.
   
   Closing this PR as it's not worth the review bandwidth.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15524, KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]

2024-01-31 Thread via GitHub


C0urante opened a new pull request, #15302:
URL: https://github.com/apache/kafka/pull/15302

   [Jira 1](https://issues.apache.org/jira/browse/KAFKA-15524), [Jira 
2](https://issues.apache.org/jira/browse/KAFKA-15917)
   
   In some rare cases, our integration tests observe that tasks are up and 
running for our sink connectors, and issue a subsequent request to stop the 
connector, before the consumers of these tasks have had a chance to form/join a 
group. When this happens, requests to alter/reset the offsets for the connector 
actually succeed, since the group either doesn't exist, or is completely empty.
   
   As a quick fix, we can wait for offsets to be committed for the connector 
before stopping it and attempting to modify its offsets. This goes a bit 
further than necessary (we should only have to wait for at least one task's 
consumer to have formed/joined a group), but off the top of my head it was the 
cleanest and briefest way to guarantee that a group had been formed. It also 
makes a little more sense in the context of the test, since there's not much 
use in modifying connector offsets when none exist yet.
   
   Opening as a draft; will wait for CI to complete before marking ready for 
review.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16189; Extend admin to support ConsumerGroupDescribe API [kafka]

2024-01-31 Thread via GitHub


AndrewJSchofield commented on code in PR #15253:
URL: https://github.com/apache/kafka/pull/15253#discussion_r1473234152


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##
@@ -158,36 +295,71 @@ public ApiResult handleResponse(
 return new ApiResult<>(completed, failed, new 
ArrayList<>(groupsToUnmap));
 }
 
+private Set 
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+return assignment.topicPartitions().stream().flatMap(topic ->
+topic.partitions().stream().map(partition ->
+new TopicPartition(topic.topicName(), partition)
+)
+).collect(Collectors.toSet());
+}
+
 private void handleError(
 CoordinatorKey groupId,
 Errors error,
+String errorMsg,
 Map failed,
-Set groupsToUnmap
+Set groupsToUnmap,
+boolean isConsumerGroupResponse
 ) {
+String apiName = isConsumerGroupResponse ? "ConsumerGroupDescribe" : 
"DescribeGroups";
+
 switch (error) {
 case GROUP_AUTHORIZATION_FAILED:
-log.debug("`DescribeGroups` request for group id {} failed due 
to error {}", groupId.idValue, error);
-failed.put(groupId, error.exception());
+log.debug("`{}` request for group id {} failed due to error 
{}.", apiName, groupId.idValue, error);
+failed.put(groupId, error.exception(errorMsg));
 break;
 
 case COORDINATOR_LOAD_IN_PROGRESS:
 // If the coordinator is in the middle of loading, then we 
just need to retry
-log.debug("`DescribeGroups` request for group id {} failed 
because the coordinator " +
-"is still in the process of loading state. Will retry", 
groupId.idValue);
+log.debug("`{}` request for group id {} failed because the 
coordinator " +
+"is still in the process of loading state. Will retry.", 
apiName, groupId.idValue);
 break;
 
 case COORDINATOR_NOT_AVAILABLE:
 case NOT_COORDINATOR:
 // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
 // the key so that we retry the `FindCoordinator` request
-log.debug("`DescribeGroups` request for group id {} returned 
error {}. " +
-"Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+log.debug("`{}` request for group id {} returned error {}. " +
+"Will attempt to find the coordinator again and retry.", 
apiName, groupId.idValue, error);
 groupsToUnmap.add(groupId);
 break;
 
+case UNSUPPORTED_VERSION:
+if (isConsumerGroupResponse) {
+log.debug("`{}` request for group id {} failed because the 
API is not " +
+"supported. Will retry with `DescribeGroups` API.", 
apiName, groupId.idValue);
+useClassicGroupApi.add(groupId.idValue);
+} else {
+log.error("`{}` request for group id {} because the 
`ConsumerGroupDescribe` API is not supported.",
+apiName, groupId.idValue);
+failed.put(groupId, error.exception(errorMsg));
+}
+break;
+
+case GROUP_ID_NOT_FOUND:
+if (isConsumerGroupResponse) {
+log.debug("`{}` request for group id {} failed because the 
group is not " +
+"a new consumer group. Will retry with 
`DescribeGroups` API.", apiName, groupId.idValue);
+useClassicGroupApi.add(groupId.idValue);
+} else {
+log.error("`{}` request for group id {} because the group 
does not exist.", apiName, groupId.idValue);

Review Comment:
   This doesn't seem grammatical. It would say "DescribeGroups request for 
group id {} because the group does not exist". I suggest "failed" or something 
like that is missing.



##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##
@@ -158,36 +295,71 @@ public ApiResult handleResponse(
 return new ApiResult<>(completed, failed, new 
ArrayList<>(groupsToUnmap));
 }
 
+private Set 
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+return assignment.topicPartitions().stream().flatMap(topic ->
+topic.partitions().stream().map(partition ->
+new TopicPartition(topic.topicName(), partition)
+)
+).collect(Collectors.toSet());
+}
+
 private void handleError(
 CoordinatorKey groupId,
 Errors error,
+String errorMsg,
 Map failed,
-Set groupsToUnmap
+ 

[jira] [Commented] (KAFKA-16212) Cache partitions by TopicIdPartition instead of TopicPartition

2024-01-31 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812824#comment-17812824
 ] 

Lianet Magrans commented on KAFKA-16212:


Similar situation and approach on the client side btw [~jolshan]. With the new 
consumer and KIP-848 we're moving in the direction of internally spreading the 
use of topic IDs more...step by step.

> Cache partitions by TopicIdPartition instead of TopicPartition
> --
>
> Key: KAFKA-16212
> URL: https://issues.apache.org/jira/browse/KAFKA-16212
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Omnia Ibrahim
>Priority: Major
>
> From the discussion in [PR 
> 15263|https://github.com/apache/kafka/pull/15263#discussion_r1471075201], it 
> would be better to cache {{allPartitions}} by {{TopicIdPartition}} instead of 
> {{TopicPartition}} to avoid ambiguity.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16213) KRaft should honor voterId outside of VotedState

2024-01-31 Thread Jira
José Armando García Sancio created KAFKA-16213:
--

 Summary: KRaft should honor voterId outside of VotedState
 Key: KAFKA-16213
 URL: https://issues.apache.org/jira/browse/KAFKA-16213
 Project: Kafka
  Issue Type: Improvement
  Components: kraft
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio


The current implementation of KRaft only stores the id of the replica for which 
it voted when it is in the VotedState. When it transitions to other states like 
Follower, Leader, Resigned, etc. it doesn't continue to remember and persist 
the replica id of the replica for which it voted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16168; Implement GroupCoordinator.onPartitionsDeleted [kafka]

2024-01-31 Thread via GitHub


jolshan commented on code in PR #15237:
URL: https://github.com/apache/kafka/pull/15237#discussion_r1473219011


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -903,6 +906,45 @@ public boolean cleanupExpiredOffsets(String groupId, 
List records) {
 return allOffsetsExpired.get() && 
!openTransactionsByGroup.containsKey(groupId);
 }
 
+/**
+ * Remove offsets of the partitions that have been deleted.
+ *
+ * @param topicPartitions   The partitions that have been deleted.
+ * @return The list of tombstones (offset commit) to append.
+ */
+public List onPartitionsDeleted(

Review Comment:
   Thanks for the thorough response. Makes sense to me.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]

2024-01-31 Thread via GitHub


gaurav-narula commented on code in PR #15262:
URL: https://github.com/apache/kafka/pull/15262#discussion_r1473210077


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2586,7 +2586,7 @@ class ReplicaManager(val config: KafkaConfig,
 // retrieve the UUID here because logManager.handleLogDirFailure handler 
removes it
 val uuid = logManager.directoryId(dir)
 logManager.handleLogDirFailure(dir)
-if (dir == config.metadataLogDir) {
+if (dir == new File(config.metadataLogDir).getAbsolutePath && 
(zkClient.isEmpty || config.migrationEnabled)) {

Review Comment:
   Addressed in 
https://github.com/apache/kafka/pull/15262/commits/b28e21a5f96d580e4463e20eb73738a157eb440c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16212) Cache partitions by TopicIdPartition instead of TopicPartition

2024-01-31 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812822#comment-17812822
 ] 

Justine Olshan commented on KAFKA-16212:


When working on KIP-516 (topic IDs) there were many areas that topic IDs could 
help but changing them all in one go would have been difficult. I'm happy to 
see more steps in this direction :) 

> Cache partitions by TopicIdPartition instead of TopicPartition
> --
>
> Key: KAFKA-16212
> URL: https://issues.apache.org/jira/browse/KAFKA-16212
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Omnia Ibrahim
>Priority: Major
>
> From the discussion in [PR 
> 15263|https://github.com/apache/kafka/pull/15263#discussion_r1471075201], it 
> would be better to cache {{allPartitions}} by {{TopicIdPartition}} instead of 
> {{TopicPartition}} to avoid ambiguity.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15524) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks

2024-01-31 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-15524:
-

Assignee: Chris Egerton

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks
> --
>
> Key: KAFKA-15524
> URL: https://issues.apache.org/jira/browse/KAFKA-15524
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.6.0, 3.5.1
>Reporter: Josep Prat
>Assignee: Chris Egerton
>Priority: Major
>  Labels: flaky, flaky-test
>
> Last seen: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/]
>  
> h3. Error Message
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.{code}
> h3. Stacktrace
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:401)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:392)
>  at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:763)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> 

[jira] [Assigned] (KAFKA-15917) Flaky test - OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks

2024-01-31 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-15917:
-

Assignee: Chris Egerton

> Flaky test - 
> OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks
> ---
>
> Key: KAFKA-15917
> URL: https://issues.apache.org/jira/browse/KAFKA-15917
> Project: Kafka
>  Issue Type: Bug
>Reporter: Haruki Okada
>Assignee: Chris Egerton
>Priority: Major
>  Labels: flaky-test
> Attachments: stdout.log
>
>
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14242/14/tests/]
>  
>  
> {code:java}
> Error
> java.lang.AssertionError: 
> Expected: a string containing "zombie sink task"
>  but: was "Could not alter connector offsets. Error response: 
> {"error_code":500,"message":"Failed to alter consumer group offsets for 
> connector test-connector"}"
> Stacktrace
> java.lang.AssertionError: 
> Expected: a string containing "zombie sink task"
>  but: was "Could not alter connector offsets. Error response: 
> {"error_code":500,"message":"Failed to alter consumer group offsets for 
> connector test-connector"}"
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:431)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
> at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
> at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
> at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
> at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> at 
> 

Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]

2024-01-31 Thread via GitHub


cmccabe commented on code in PR #15262:
URL: https://github.com/apache/kafka/pull/15262#discussion_r1473174695


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2586,7 +2586,7 @@ class ReplicaManager(val config: KafkaConfig,
 // retrieve the UUID here because logManager.handleLogDirFailure handler 
removes it
 val uuid = logManager.directoryId(dir)
 logManager.handleLogDirFailure(dir)
-if (dir == config.metadataLogDir) {
+if (dir == new File(config.metadataLogDir).getAbsolutePath && 
(zkClient.isEmpty || config.migrationEnabled)) {

Review Comment:
   This is not quite the correct check... you should check 
`config.processRoles` (probably `config.processRoles.isNotEmpty || 
config.migrationEnabled`



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2586,7 +2586,7 @@ class ReplicaManager(val config: KafkaConfig,
 // retrieve the UUID here because logManager.handleLogDirFailure handler 
removes it
 val uuid = logManager.directoryId(dir)
 logManager.handleLogDirFailure(dir)
-if (dir == config.metadataLogDir) {
+if (dir == new File(config.metadataLogDir).getAbsolutePath && 
(zkClient.isEmpty || config.migrationEnabled)) {

Review Comment:
   This is not quite the correct check... you should check 
`config.processRoles` (probably `config.processRoles.isNotEmpty || 
config.migrationEnabled` )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]

2024-01-31 Thread via GitHub


cmccabe commented on code in PR #15262:
URL: https://github.com/apache/kafka/pull/15262#discussion_r1473174695


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2586,7 +2586,7 @@ class ReplicaManager(val config: KafkaConfig,
 // retrieve the UUID here because logManager.handleLogDirFailure handler 
removes it
 val uuid = logManager.directoryId(dir)
 logManager.handleLogDirFailure(dir)
-if (dir == config.metadataLogDir) {
+if (dir == new File(config.metadataLogDir).getAbsolutePath && 
(zkClient.isEmpty || config.migrationEnabled)) {

Review Comment:
   This is not the correct check... you should check `config.processRoles`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]

2024-01-31 Thread via GitHub


nicktelford commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1473168658


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##


Review Comment:
   Yeah, I can do that. I'm a little concerned that we might forget to move it 
back though!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]

2024-01-31 Thread via GitHub


nicktelford commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1473167769


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1836,6 +1841,33 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
 }
 }
 
+// track the size of the transaction buffer on each iteration to predict 
when it will be exceeded in advance
+private long lastUncommittedBytes = 0L;
+
+boolean needsCommit(final boolean updateDelta) {
+if (maxUncommittedStateBytes < 0) {
+// if our transaction buffers are unbounded, we never need to 
force an early commit
+return false;
+}
+
+// force an early commit if the uncommitted bytes exceeds or is 
*likely to exceed* the configured threshold
+final long uncommittedBytes = tasks.approximateUncommittedStateBytes();
+
+final long deltaBytes = Math.max(0, uncommittedBytes - 
lastUncommittedBytes);
+
+final boolean needsCommit =  maxUncommittedStateBytes > -1 && 
uncommittedBytes + deltaBytes > maxUncommittedStateBytes;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]

2024-01-31 Thread via GitHub


nicktelford commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1473167034


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1836,6 +1841,33 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
 }
 }
 
+// track the size of the transaction buffer on each iteration to predict 
when it will be exceeded in advance
+private long lastUncommittedBytes = 0L;
+
+boolean needsCommit(final boolean updateDelta) {
+if (maxUncommittedStateBytes < 0) {
+// if our transaction buffers are unbounded, we never need to 
force an early commit
+return false;
+}
+
+// force an early commit if the uncommitted bytes exceeds or is 
*likely to exceed* the configured threshold
+final long uncommittedBytes = tasks.approximateUncommittedStateBytes();
+
+final long deltaBytes = Math.max(0, uncommittedBytes - 
lastUncommittedBytes);
+
+final boolean needsCommit =  maxUncommittedStateBytes > -1 && 
uncommittedBytes + deltaBytes > maxUncommittedStateBytes;

Review Comment:
   Yep! I did some refactoring and didn't spot the dead code :smile: 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1836,6 +1841,33 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
 }
 }
 
+// track the size of the transaction buffer on each iteration to predict 
when it will be exceeded in advance
+private long lastUncommittedBytes = 0L;
+
+boolean needsCommit(final boolean updateDelta) {
+if (maxUncommittedStateBytes < 0) {
+// if our transaction buffers are unbounded, we never need to 
force an early commit
+return false;
+}
+
+// force an early commit if the uncommitted bytes exceeds or is 
*likely to exceed* the configured threshold
+final long uncommittedBytes = tasks.approximateUncommittedStateBytes();
+
+final long deltaBytes = Math.max(0, uncommittedBytes - 
lastUncommittedBytes);
+
+final boolean needsCommit =  maxUncommittedStateBytes > -1 && 
uncommittedBytes + deltaBytes > maxUncommittedStateBytes;

Review Comment:
   Woops! I did some refactoring and didn't spot the dead code :smile: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]

2024-01-31 Thread via GitHub


nicktelford commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1473166531


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1836,6 +1841,33 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
 }
 }
 
+// track the size of the transaction buffer on each iteration to predict 
when it will be exceeded in advance
+private long lastUncommittedBytes = 0L;
+
+boolean needsCommit(final boolean updateDelta) {
+if (maxUncommittedStateBytes < 0) {
+// if our transaction buffers are unbounded, we never need to 
force an early commit
+return false;
+}

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]

2024-01-31 Thread via GitHub


nicktelford commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1473165230


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1836,6 +1841,33 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
 }
 }
 
+// track the size of the transaction buffer on each iteration to predict 
when it will be exceeded in advance
+private long lastUncommittedBytes = 0L;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]

2024-01-31 Thread via GitHub


philipnee commented on code in PR #15216:
URL: https://github.com/apache/kafka/pull/15216#discussion_r1473160823


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
+
+/**
+ * Base class for different consumer metrics to extend. This class helps to 
construct the logical group name from the
+ * given prefix and suffix, and provides a few common utilities.
+ *
+ * 
+ * The suffix can be one of the following:
+ * 
+ * -coordinator-metrics: {@link 
MetricGroupSuffix#COORDINATOR}
+ * -metrics: {@link MetricGroupSuffix#CONSUMER}
+ * 
+ * 
+ */
+public abstract class AbstractConsumerMetricsManager {

Review Comment:
   are you suggesting making this a utility class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]

2024-01-31 Thread via GitHub


nicktelford commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1473153338


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1097,10 +1102,10 @@ void handleRevocation(final Collection 
revokedPartitions) {
 for (final Task task : commitNeededActiveTasks) {
 if (!dirtyTasks.contains(task)) {
 try {
-// for non-revoking active tasks, we should not 
enforce checkpoint
-// since if it is EOS enabled, no checkpoint should be 
written while
-// the task is in RUNNING tate
-task.postCommit(false);
+// we only enforce a checkpoint if the transaction 
buffers are full
+// to avoid unnecessary flushing of stores under EOS
+final boolean enforceCheckpoint = 
maxUncommittedStateBytes > -1 && tasks.approximateUncommittedStateBytes() >= 
maxUncommittedStateBytes;
+task.postCommit(enforceCheckpoint);

Review Comment:
   Same as above.
   
   That said, I'm not convinced this particular change is necessary. This code 
is run during rebalance, and AFAIK no records will be processing during a 
rebalance; it shouldn't be possible for the transaction buffer to have 
increased beyond the last time we checked in the `StreamThread` main run-loop 
(aka. `StreamThread#maybeCommit`).
   
   Worst case scenario, if we leave this as `task.postCommit(false)`, and the 
state stores aren't flushed during a rebalance, if the transaction buffer is 
already close to exceeding its capacity, it will be detected during the next 
iteration of the run-loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16212) Cache partitions by TopicIdPartition instead of TopicPartition

2024-01-31 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim reassigned KAFKA-16212:
-

Assignee: Omnia Ibrahim

> Cache partitions by TopicIdPartition instead of TopicPartition
> --
>
> Key: KAFKA-16212
> URL: https://issues.apache.org/jira/browse/KAFKA-16212
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Omnia Ibrahim
>Priority: Major
>
> From the discussion in [PR 
> 15263|https://github.com/apache/kafka/pull/15263#discussion_r1471075201], it 
> would be better to cache {{allPartitions}} by {{TopicIdPartition}} instead of 
> {{TopicPartition}} to avoid ambiguity.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]

2024-01-31 Thread via GitHub


nicktelford commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1473141072


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##
@@ -157,7 +158,8 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final 
Collection tasksToCo
 if (task.commitNeeded()) {
 task.clearTaskTimeout();
 ++committed;
-task.postCommit(false);
+// under EOS, we need to enforce a checkpoint if our 
transaction buffers will exceeded their capacity

Review Comment:
   Actually, at this point we've only committed our Kafka transaction. The 
StateStore has not been committed/flushed, so our transaction buffers will 
still be full.
   
   `task.postCommit` is the process that "checkpoints" state stores. We need to 
enforce a checkpoint when the transaction buffer is full, because we only flush 
the StateStore when it is checkpointed. See `AbstractTask` line `99`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]

2024-01-31 Thread via GitHub


nicktelford commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1473126369


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -1374,7 +1376,7 @@ public void signalResume() {
  */
 int maybeCommit() {
 final int committed;
-if (now - lastCommitMs > commitTimeMs) {
+if (taskManager.needsCommit(true) || now - lastCommitMs > 
commitTimeMs) {

Review Comment:
   I've added a test for this in 0577905, but it feels like a bit of a 
tautology: it essentially just tests that `commit()` is called when 
`taskManager.needsCommit(anyBoolean())` returns `true`.
   
   The real meat of this will be in the `TaskManagerTests`, which I'm working 
on separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-31 Thread via GitHub


C0urante commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1919475280

   Thanks for the in-depth analysis! I think part of the problem here stems 
from trying to make our internal `Future`-based API cooperate with Java's 
`CompleteableFuture` API. I've sketched out something below that doesn't rely 
on `CompletableFuture` and (I believe) preserves the semantics we want for 
asynchronicity:
   
   ```java
   public class ConnectorOffsetBackingStore implements OffsetBackingStore {
   
   @Override
   public Future set(Map values, 
Callback callback) {
   final OffsetBackingStore primaryStore;
   final OffsetBackingStore secondaryStore;
   if (connectorStore.isPresent()) {
   primaryStore = connectorStore.get();
   secondaryStore = workerStore.orElse(null);
   } else if (workerStore.isPresent()) {
   primaryStore = workerStore.get();
   secondaryStore = null;
   } else {
   // Should never happen since we check for this case in the 
constructor, but just in case, this should
   // be more informative than the NPE that would otherwise be 
thrown
   throw new IllegalStateException("At least one non-null offset 
store must be provided");
   }
   
   Map regularOffsets = new HashMap<>();
   Map tombstoneOffsets = new HashMap<>();
   values.forEach((partition, offset) -> {
   if (offset == null) {
   tombstoneOffsets.put(partition, null);
   } else {
   regularOffsets.put(partition, offset);
   }
   });
   
   if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
   return secondaryStore.set(tombstoneOffsets, 
(tombstoneWriteError, ignored) -> {
   if (tombstoneWriteError != null) {
   log.trace("Skipping offsets write to primary store 
because secondary tombstone write has failed", tombstoneWriteError);
   try (LoggingContext context = loggingContext()) {
   callback.onCompletion(tombstoneWriteError, ignored);
   }
   return;
   }
   
   setPrimaryThenSecondary(primaryStore, secondaryStore, 
values, regularOffsets, callback);
   });
   } else {
   return setPrimaryThenSecondary(primaryStore, secondaryStore, 
values, regularOffsets, callback);
   }
   }
   
   private Future setPrimaryThenSecondary(
   OffsetBackingStore primaryStore,
   OffsetBackingStore secondaryStore,
   Map completeOffsets,
   Map nonTombstoneOffsets,
   Callback callback
   ) {
   return primaryStore.set(completeOffsets, (primaryWriteError, 
ignored) -> {
   if (secondaryStore != null) {
   if (primaryWriteError != null) {
   log.trace("Skipping offsets write to secondary store 
because primary write has failed", primaryWriteError);
   } else {
   try {
   // Invoke OffsetBackingStore::set but ignore the 
resulting future; we don't block on writes to this
   // backing store.
   secondaryStore.set(nonTombstoneOffsets, 
(secondaryWriteError, ignored2) -> {
   try (LoggingContext context = loggingContext()) {
   if (secondaryWriteError != null) {
   log.warn("Failed to write offsets to 
secondary backing store", secondaryWriteError);
   } else {
   log.debug("Successfully flushed offsets 
to secondary backing store");
   }
   }
   });
   } catch (Exception e) {
   log.warn("Failed to write offsets to secondary 
backing store", e);
   }
   }
   }
   try (LoggingContext context = loggingContext()) {
   callback.onCompletion(primaryWriteError, ignored);
   }
   });
   }
   }
   ```
   
   It also obviates the need for the `exactlyOnce` and `offsetFlushTimeoutMs` 
fields.
   
   If this looks acceptable, I think the only question left is whether 
out-of-order writes are possible because of how things are chained. I believe 
this is only a problem for non-exactly-once source tasks (since we only have at 
most one in-flight offset commit at a time when exactly-once support is 
enabled), and should be handled gracefully by 
`OffsetStorageWriter::cancelFlush`, but it'd be nice to have a second pair of 
eyes to make sure.


-- 
This is an automated 

Re: [PR] MINOR: Clean up core metrics, migration, network, raft, security, serializer, tools, utils, and zookeeper modules [kafka]

2024-01-31 Thread via GitHub


jlprat commented on code in PR #15279:
URL: https://github.com/apache/kafka/pull/15279#discussion_r1473125018


##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -92,18 +92,18 @@ object RequestChannel extends Logging {
 val envelope: Option[RequestChannel.Request] = None) extends 
BaseRequest {
 // These need to be volatile because the readers are in the network thread 
and the writers are in the request
 // handler threads or the purgatory threads
-@volatile var requestDequeueTimeNanos = -1L
-@volatile var apiLocalCompleteTimeNanos = -1L
-@volatile var responseCompleteTimeNanos = -1L
-@volatile var responseDequeueTimeNanos = -1L
-@volatile var messageConversionsTimeNanos = 0L
-@volatile var apiThrottleTimeMs = 0L
-@volatile var temporaryMemoryBytes = 0L
+@volatile var requestDequeueTimeNanos: Long = -1L
+@volatile var apiLocalCompleteTimeNanos: Long = -1L
+@volatile var responseCompleteTimeNanos: Long = -1L
+@volatile var responseDequeueTimeNanos: Long = -1L
+@volatile var messageConversionsTimeNanos: Long = 0L
+@volatile var apiThrottleTimeMs: Long = 0L
+@volatile var temporaryMemoryBytes: Long = 0L
 @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 @volatile var callbackRequestDequeueTimeNanos: Option[Long] = None
 @volatile var callbackRequestCompleteTimeNanos: Option[Long] = None
 
-val session = Session(context.principal, context.clientAddress)
+val session: Session = Session(context.principal, context.clientAddress)

Review Comment:
   The changes made in this PR reflect the current (as per 2.12 and 2.13) 
idiomatic way to write Scala. For example:
   Boolean parameters should be named, idempotent methods that are 
parameterless can be omit the parenthesis, side-effecting parameterless methods 
should include the parenthesis, and so on



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >