Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lucasbru merged PR #15271: URL: https://github.com/apache/kafka/pull/15271 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15675: Improve worker liveness check during Connect integration tests [kafka]
yashmayya commented on code in PR #15249: URL: https://github.com/apache/kafka/pull/15249#discussion_r1473904200 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java: ## @@ -963,11 +962,19 @@ public Set activeWorkers() { return workers().stream() Review Comment: I'm unable to comment on that line directly but the `ObjectMapper` initialized in the line above can be removed now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps
[ https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16156: -- Component/s: system tests > System test failing for new consumer on endOffsets with negative timestamps > --- > > Key: KAFKA-16156 > URL: https://issues.apache.org/jira/browse/KAFKA-16156 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, system tests >Reporter: Lianet Magrans >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid > negative timestamp". > Trace: > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Received ListOffsetResponse > ListOffsetsResponseData(throttleTimeMs=0, > topics=[ListOffsetsTopicResponse(name='input-topic', > partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, > oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from > broker worker2:9092 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,932] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Handling ListOffsetResponse > response for input-topic-0. Fetched offset 42804, timestamp -1 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Updating last stable offset for > partition input-topic-0 to 42804 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,933] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Fetch offsets completed > successfully for partitions and timestamps {input-topic-0=-1}. Result > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862 > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,933] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] No events to process > (org.apache.kafka.clients.consumer.internals.events.EventProcessor) > [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event > loop (org.apache.kafka.tools.TransactionalMessageCopier) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Invalid negative timestamp > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) > at > org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) > at > org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) > at > org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) > at > org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) > Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp > at > org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39) > at > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253) > at > org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at >
[jira] [Updated] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps
[ https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16156: -- Labels: consumer-threading-refactor kip-848-client-support system-tests (was: consumer-threading-refactor kip-848-client-support) > System test failing for new consumer on endOffsets with negative timestamps > --- > > Key: KAFKA-16156 > URL: https://issues.apache.org/jira/browse/KAFKA-16156 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid > negative timestamp". > Trace: > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Received ListOffsetResponse > ListOffsetsResponseData(throttleTimeMs=0, > topics=[ListOffsetsTopicResponse(name='input-topic', > partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, > oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from > broker worker2:9092 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,932] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Handling ListOffsetResponse > response for input-topic-0. Fetched offset 42804, timestamp -1 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Updating last stable offset for > partition input-topic-0 to 42804 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,933] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Fetch offsets completed > successfully for partitions and timestamps {input-topic-0=-1}. Result > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862 > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,933] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] No events to process > (org.apache.kafka.clients.consumer.internals.events.EventProcessor) > [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event > loop (org.apache.kafka.tools.TransactionalMessageCopier) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Invalid negative timestamp > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) > at > org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) > at > org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) > at > org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) > at > org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) > Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp > at > org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39) > at > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253) > at > org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at >
[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics
[ https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16115: -- Labels: consumer-threading-refactor metrics (was: consumer-threading-refactor) > AsyncKafkaConsumer: Add missing heartbeat metrics > - > > Key: KAFKA-16115 > URL: https://issues.apache.org/jira/browse/KAFKA-16115 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, metrics > Fix For: 3.8.0 > > > The following metrics are missing: > |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]| > |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]| > |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]| > |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]| > |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16112) Review JMX metrics in Async Consumer and determine the missing ones
[ https://issues.apache.org/jira/browse/KAFKA-16112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16112: -- Labels: consumer-threading-refactor metrics (was: consumer-threading-refactor) > Review JMX metrics in Async Consumer and determine the missing ones > --- > > Key: KAFKA-16112 > URL: https://issues.apache.org/jira/browse/KAFKA-16112 > Project: Kafka > Issue Type: Task > Components: clients, consumer, metrics >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, metrics > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics
[ https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16113: -- Labels: consumer-threading-refactor metrics (was: consumer-threading-refactor) > AsyncKafkaConsumer: Add missing offset commit metrics > - > > Key: KAFKA-16113 > URL: https://issues.apache.org/jira/browse/KAFKA-16113 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, metrics > Fix For: 3.8.0 > > > The following metrics are missing from the AsyncKafkaConsumer: > commit-latency-avg > commit-latency-max > commit-rate > commit-total > committed-time-ns-total -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16142) Update metrics documentation for errors and new metrics
[ https://issues.apache.org/jira/browse/KAFKA-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16142: -- Labels: consumer-threading-refactor metrics (was: consumer-threading-refactor) > Update metrics documentation for errors and new metrics > --- > > Key: KAFKA-16142 > URL: https://issues.apache.org/jira/browse/KAFKA-16142 > Project: Kafka > Issue Type: Task > Components: clients, consumer, documentation, metrics >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, metrics > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
[ https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16116: -- Labels: consumer-threading-refactor metrics (was: consumer-threading-refactor) > AsyncKafkaConsumer: Add missing rebalance metrics > - > > Key: KAFKA-16116 > URL: https://issues.apache.org/jira/browse/KAFKA-16116 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, metrics > Fix For: 3.8.0 > > > The following metrics are missing: > |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| > |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| > |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| > |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| > |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| > |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| > |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16143) New metrics for KIP-848 protocol
[ https://issues.apache.org/jira/browse/KAFKA-16143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16143: -- Labels: kip-848-client-support metrics (was: kip-848-client-support) > New metrics for KIP-848 protocol > > > Key: KAFKA-16143 > URL: https://issues.apache.org/jira/browse/KAFKA-16143 > Project: Kafka > Issue Type: Task > Components: clients, consumer, metrics >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support, metrics > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16109) Ensure system tests cover the "simple consumer + commit" use case
[ https://issues.apache.org/jira/browse/KAFKA-16109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16109: -- Component/s: system tests > Ensure system tests cover the "simple consumer + commit" use case > - > > Key: KAFKA-16109 > URL: https://issues.apache.org/jira/browse/KAFKA-16109 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId
[ https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15557: -- Labels: consumer-threading-refactor fetcher unit-tests (was: consumer-threading-refactor fetcher) > Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in > assignFromUserNoId > --- > > Key: KAFKA-15557 > URL: https://issues.apache.org/jira/browse/KAFKA-15557 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > Fix For: 4.0.0 > > > The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods > named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to > perform duplicate metadata updates: > {code:java} > private void assignFromUser(Set partitions) { > subscriptions.assignFromUser(partitions); > client.updateMetadata(initialUpdateResponse); > // A dummy metadata update to ensure valid leader epoch. > metadata.updateWithCurrentRequestVersion( > RequestTestUtils.metadataUpdateWithIds( > "dummy", > 1, > Collections.emptyMap(), > singletonMap(topicName, 4), > tp -> validLeaderEpoch, topicIds > ), > false, > 0L > ); > } > {code} > {{client.updateMetadata()}} eventually calls > {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is > updating the cluster metadata twice with different values. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Labels: consumer-threading-refactor fetcher unit-tests (was: consumer-threading-refactor fetcher) > Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval > - > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, fetcher, unit-tests > Fix For: 4.0.0 > > > As part of the review for [FetchRequestManager pull > request|https://github.com/apache/kafka/pull/14406], [~junrao] had some > questions related to the correctness and clarity of the > {{FetcherTest.testCompletedFetchRemoval()}} test: > Questions: > * https://github.com/apache/kafka/pull/14406#discussion_r1347908197 > * https://github.com/apache/kafka/pull/14406#discussion_r1347910980 > * https://github.com/apache/kafka/pull/14406#discussion_r1347913781 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15635) Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric
[ https://issues.apache.org/jira/browse/KAFKA-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15635: -- Labels: consumer-threading-refactor fetcher unit-tests (was: consumer-threading-refactor fetcher) > Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric > - > > Key: KAFKA-15635 > URL: https://issues.apache.org/jira/browse/KAFKA-15635 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > Fix For: 4.0.0 > > > Why is {{recordsFetchLeadMin}} different from {{partitionLead}} given there > is only 1 assigned partition? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16000) Migrate MembershipManagerImplTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-16000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16000: -- Labels: consumer-threading-refactor unit-tests (was: consumer-threading-refactor) > Migrate MembershipManagerImplTest away from ConsumerTestBuilder > --- > > Key: KAFKA-16000 > URL: https://issues.apache.org/jira/browse/KAFKA-16000 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lucas Brutschy >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, unit-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded
[ https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15637: -- Labels: consumer-threading-refactor fetcher unit-tests (was: consumer-threading-refactor fetcher) > Investigate FetcherTest's/FetchRequestManager's > testFetchCompletedBeforeHandlerAdded > > > Key: KAFKA-15637 > URL: https://issues.apache.org/jira/browse/KAFKA-15637 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > Fix For: 4.0.0 > > > Thanks for the reply. I still don't quite understand the test. Why do we > duplicate the following code both inside and outside of {{{}setWakeupHook{}}}? > > {code:java} > networkClientDelegate.disconnectAsync(readReplica); > networkClientDelegate.poll(time.timer(0)); > {code} > > MockClient is only woken up through > {{{}networkClientDelegate.disconnectAsync{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored
[ https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15639: -- Labels: consumer-threading-refactor unit-tests (was: consumer-threading-refactor) > Investigate ConsumerNetworkThreadTest's > testResetPositionsProcessFailureIsIgnored > - > > Key: KAFKA-15639 > URL: https://issues.apache.org/jira/browse/KAFKA-15639 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, unit-tests > Fix For: 3.8.0 > > > The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: > > {code:java} > @Test > public void testResetPositionsProcessFailureIsIgnored() { > doThrow(new > NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); > ResetPositionsApplicationEvent event = new > ResetPositionsApplicationEvent(); > applicationEventsQueue.add(event); > assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); > > verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); > } > {code} > > [~junrao] asks: > > {quote}Not sure if this is a useful test since > {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly > throw an exception? > {quote} > > I commented out the {{doThrow}} line and it did not impact the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15652) Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp()
[ https://issues.apache.org/jira/browse/KAFKA-15652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15652: -- Labels: consumer-threading-refactor position unit-tests (was: consumer-threading-refactor position) > Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp() > > > Key: KAFKA-15652 > URL: https://issues.apache.org/jira/browse/KAFKA-15652 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, position, unit-tests > Fix For: 3.8.0 > > > In the {{updateFetchPositions()}} method implementation, both > {{KafkaConsumer}} and {{PrototypeAsyncConsumer}} reset positions > asynchronously. [~junrao] stated the following in a [recent PR > review|https://github.com/apache/kafka/pull/14406#discussion_r1349173413]: > {quote}There is a subtle difference between transitioning to reset from > initializing and transitioning to reset from {{OffsetOutOfRangeException}} > during fetch. In the latter, the application thread will call > {{{}FetchCollector.handleInitializeErrors(){}}}. If there is no default > offset reset policy, an {{OffsetOutOfRangeException}} will be thrown to the > application thread during {{{}poll{}}}, which is what we want. > However, for the former, if there is no default offset reset policy, we > simply ignore that partition through > {{{}OffsetFetcherUtils.getOffsetResetTimestamp{}}}. It seems in that case, > the partition will be forever in the reset state and the application thread > won't get the {{{}OffsetOutOfRangeException{}}}. > {quote} > I intentionally changed the code so that no exceptions were thrown in > {{OffsetFetcherUtils.getOffsetResetTimestamp()}} and would simply return an > empty map. When I ran the unit tests and integration tests, there were no > failures, strongly suggesting that there is no coverage of this particular > edge case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid
[ https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15991: -- Labels: consumer-threading-refactor flaky-test kip-848-client-support unit-tests (was: consumer-threading-refactor flaky-test kip-848-client-support) > Flaky new consumer test testGroupIdNotNullAndValid > -- > > Key: KAFKA-15991 > URL: https://issues.apache.org/jira/browse/KAFKA-15991 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lianet Magrans >Assignee: Bruno Cadonna >Priority: Major > Labels: consumer-threading-refactor, flaky-test, > kip-848-client-support, unit-tests > Fix For: 3.7.0 > > > Fails locally when running it in a loop with it's latest changes from > [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.] > Failed the build so temporarily disabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15999) Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-15999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15999: -- Labels: consumer-threading-refactor unit-tests (was: consumer-threading-refactor) > Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder > - > > Key: KAFKA-15999 > URL: https://issues.apache.org/jira/browse/KAFKA-15999 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lucas Brutschy >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, unit-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer
[ https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15638: -- Labels: consumer-threading-refactor unit-tests (was: consumer-threading-refactor) > Investigate ConsumerNetworkThreadTest's testPollResultTimer > --- > > Key: KAFKA-15638 > URL: https://issues.apache.org/jira/browse/KAFKA-15638 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, unit-tests > Fix For: 3.8.0 > > > Regarding this comment in {{{}testPollResultTimer{}}}... > {code:java} > // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE > upon success| > {code} > [~junrao] asked: > {quote}Which call is returning Long.MAX_VALUE? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16001: -- Labels: consumer-threading-refactor unit-tests (was: consumer-threading-refactor) > Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder > --- > > Key: KAFKA-16001 > URL: https://issues.apache.org/jira/browse/KAFKA-16001 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lucas Brutschy >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, unit-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15636: -- Labels: consumer-threading-refactor fetcher unit-tests (was: consumer-threading-refactor fetcher) > Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics > > > Key: KAFKA-15636 > URL: https://issues.apache.org/jira/browse/KAFKA-15636 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > Fix For: 4.0.0 > > > {{expectedBytes}} is calculated as total, instead of avg. Is this correct? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14724) Port tests in FetcherTest to FetchRequestManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-14724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14724: -- Labels: consumer-threading-refactor fetcher kip-848-e2e kip-848-preview unit-tests (was: consumer-threading-refactor fetcher kip-848-e2e kip-848-preview) > Port tests in FetcherTest to FetchRequestManagerTest > > > Key: KAFKA-14724 > URL: https://issues.apache.org/jira/browse/KAFKA-14724 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, fetcher, kip-848-e2e, > kip-848-preview, unit-tests > Fix For: 3.7.0 > > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task involves copying the relevant tests from {{FetcherTest}} and > modifying them to fit a new unit test named {{{}FetchRequestManagerTest{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15634) Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15634: -- Labels: consumer-threading-refactor fetcher unit-tests (was: consumer-threading-refactor fetcher) > Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics > > > Key: KAFKA-15634 > URL: https://issues.apache.org/jira/browse/KAFKA-15634 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > Fix For: 4.0.0 > > > What is the point of the code in the initial {{while}} loop since the receive > is delayed and thus there is no {{throttleDelayMs}} received in the client? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15617) Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions and testInflightFetchOnPendingPartitions overlap
[ https://issues.apache.org/jira/browse/KAFKA-15617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15617: -- Labels: consumer-threading-refactor fetcher unit-tests (was: consumer-threading-refactor fetcher) > Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions > and testInflightFetchOnPendingPartitions overlap > -- > > Key: KAFKA-15617 > URL: https://issues.apache.org/jira/browse/KAFKA-15617 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > Fix For: 4.0.0 > > > In FetcherTest, the two tests testFetchingPendingPartitions and > testInflightFetchOnPendingPartitions have significant overlap. Perhaps the > former subsumes the latter? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15971) Re-enable consumer integration tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15971: -- Component/s: (was: unit tests) > Re-enable consumer integration tests for new consumer > - > > Key: KAFKA-15971 > URL: https://issues.apache.org/jira/browse/KAFKA-15971 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: consumer-threading-refactor, integration-tests, kip-848, > kip-848-preview > Fix For: 3.7.0 > > > Re-enable the consumer integration tests for the new consumer making sure > that build stability is not impacted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15971) Re-enable consumer integration tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15971: -- Labels: consumer-threading-refactor integration-tests kip-848 kip-848-preview (was: consumer-threading-refactor kip-848 kip-848-preview system-tests) > Re-enable consumer integration tests for new consumer > - > > Key: KAFKA-15971 > URL: https://issues.apache.org/jira/browse/KAFKA-15971 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: consumer-threading-refactor, integration-tests, kip-848, > kip-848-preview > Fix For: 3.7.0 > > > Re-enable the consumer integration tests for the new consumer making sure > that build stability is not impacted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15932) Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")
[ https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15932: -- Labels: consumer-threading-refactor flaky-test integration-tests kip-848-client-support (was: consumer-threading-refactor flaky-test integration-tests kip-848 kip-848-client-support) > Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer") > --- > > Key: KAFKA-15932 > URL: https://issues.apache.org/jira/browse/KAFKA-15932 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: consumer-threading-refactor, flaky-test, > integration-tests, kip-848-client-support > Fix For: 3.7.0 > > > Intermittently failing test for the new consumer. > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/ > ```Error > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > Stacktrace > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > at > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161) > at > app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128) > at > app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at >
[jira] [Updated] (KAFKA-15932) Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")
[ https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15932: -- Labels: consumer-threading-refactor flaky-test integration-tests kip-848 kip-848-client-support (was: consumer-threading-refactor flaky-test kip-848 kip-848-client-support system-tests) > Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer") > --- > > Key: KAFKA-15932 > URL: https://issues.apache.org/jira/browse/KAFKA-15932 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: consumer-threading-refactor, flaky-test, > integration-tests, kip-848, kip-848-client-support > Fix For: 3.7.0 > > > Intermittently failing test for the new consumer. > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/ > ```Error > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > Stacktrace > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > at > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161) > at > app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128) > at > app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at >
[jira] [Updated] (KAFKA-15932) Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")
[ https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15932: -- Component/s: (was: unit tests) > Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer") > --- > > Key: KAFKA-15932 > URL: https://issues.apache.org/jira/browse/KAFKA-15932 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: consumer-threading-refactor, flaky-test, kip-848, > kip-848-client-support, system-tests > Fix For: 3.7.0 > > > Intermittently failing test for the new consumer. > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/ > ```Error > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > Stacktrace > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > at > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161) > at > app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128) > at > app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at >
[jira] [Updated] (KAFKA-15986) New consumer group protocol integration test failures
[ https://issues.apache.org/jira/browse/KAFKA-15986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15986: -- Component/s: (was: unit tests) > New consumer group protocol integration test failures > - > > Key: KAFKA-15986 > URL: https://issues.apache.org/jira/browse/KAFKA-15986 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: consumer-threading-refactor, integration-tests, kip-848, > kip-848-client-support > Fix For: 3.7.0 > > > A recent change in `AsyncKafkaConsumer.updateFetchPositions` has made > fetching fail without returning records in some situations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid
[ https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15991: -- Labels: consumer-threading-refactor flaky-test kip-848-client-support (was: consumer-threading-refactor flaky-test kip-848 kip-848-client-support system-tests) > Flaky new consumer test testGroupIdNotNullAndValid > -- > > Key: KAFKA-15991 > URL: https://issues.apache.org/jira/browse/KAFKA-15991 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lianet Magrans >Assignee: Bruno Cadonna >Priority: Major > Labels: consumer-threading-refactor, flaky-test, > kip-848-client-support > Fix For: 3.7.0 > > > Fails locally when running it in a loop with it's latest changes from > [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.] > Failed the build so temporarily disabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15515) Remove duplicated integration tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15515: -- Component/s: (was: unit tests) > Remove duplicated integration tests for new consumer > > > Key: KAFKA-15515 > URL: https://issues.apache.org/jira/browse/KAFKA-15515 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, system-tests > Fix For: 3.7.0 > > > This task involves removing the temporary `PlaintextAsyncConsumer` file > containing duplicated integration tests for the new consumer. The copy was > generated to catch regressions and validate functionality in the new consumer > while in development. It should be deleted when the new consumer is fully > implemented and the existing integration tests (`PlaintextConsumerTest`) can > be executed for both implementations. > > Context: > > For the current KafkaConsumer, a set of integration tests exist in the file > PlaintextConsumerTest. Those tests cannot be executed as such for the new > consumer implementation for 2 main reasons > - the new consumer is being developed as a new PrototypeAsyncConsumer class, > in parallel to the existing KafkaConsumer. > - the new consumer is under development, so it does not support all the > consumer functionality yet. > > In order to be able to run the subsets of tests that the new consumer > supports while the implementation completes, it was decided to : > - to make a copy of the `PlaintextAsyncConsumer` class, named > PlaintextAsyncConsumer. > - leave all the existing integration tests that cover the simple consumer > case unchanged, and disable the tests that are not yet supported by the new > consumer. Disabled tests will be enabled as the async consumer > evolves. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15986) New consumer group protocol integration test failures
[ https://issues.apache.org/jira/browse/KAFKA-15986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15986: -- Labels: consumer-threading-refactor integration-tests kip-848 kip-848-client-support (was: consumer-threading-refactor kip-848 kip-848-client-support system-tests) > New consumer group protocol integration test failures > - > > Key: KAFKA-15986 > URL: https://issues.apache.org/jira/browse/KAFKA-15986 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: consumer-threading-refactor, integration-tests, kip-848, > kip-848-client-support > Fix For: 3.7.0 > > > A recent change in `AsyncKafkaConsumer.updateFetchPositions` has made > fetching fail without returning records in some situations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15515) Remove duplicated integration tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15515: -- Labels: consumer-threading-refactor integration-tests (was: consumer-threading-refactor system-tests) > Remove duplicated integration tests for new consumer > > > Key: KAFKA-15515 > URL: https://issues.apache.org/jira/browse/KAFKA-15515 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, integration-tests > Fix For: 3.7.0 > > > This task involves removing the temporary `PlaintextAsyncConsumer` file > containing duplicated integration tests for the new consumer. The copy was > generated to catch regressions and validate functionality in the new consumer > while in development. It should be deleted when the new consumer is fully > implemented and the existing integration tests (`PlaintextConsumerTest`) can > be executed for both implementations. > > Context: > > For the current KafkaConsumer, a set of integration tests exist in the file > PlaintextConsumerTest. Those tests cannot be executed as such for the new > consumer implementation for 2 main reasons > - the new consumer is being developed as a new PrototypeAsyncConsumer class, > in parallel to the existing KafkaConsumer. > - the new consumer is under development, so it does not support all the > consumer functionality yet. > > In order to be able to run the subsets of tests that the new consumer > supports while the implementation completes, it was decided to : > - to make a copy of the `PlaintextAsyncConsumer` class, named > PlaintextAsyncConsumer. > - leave all the existing integration tests that cover the simple consumer > case unchanged, and disable the tests that are not yet supported by the new > consumer. Disabled tests will be enabled as the async consumer > evolves. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
[ https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16167: -- Labels: consumer-threading-refactor integration-tests kip-848-client-support (was: consumer-threading-refactor kip-848-client-support system-tests) > Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup > -- > > Key: KAFKA-16167 > URL: https://issues.apache.org/jira/browse/KAFKA-16167 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
[ https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16167: -- Component/s: (was: unit tests) > Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup > -- > > Key: KAFKA-16167 > URL: https://issues.apache.org/jira/browse/KAFKA-16167 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16109) Ensure system tests cover the "simple consumer + commit" use case
[ https://issues.apache.org/jira/browse/KAFKA-16109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16109: -- Component/s: (was: system tests) > Ensure system tests cover the "simple consumer + commit" use case > - > > Key: KAFKA-16109 > URL: https://issues.apache.org/jira/browse/KAFKA-16109 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16109) Ensure system tests cover the "simple consumer + commit" use case
[ https://issues.apache.org/jira/browse/KAFKA-16109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16109: -- Labels: consumer-threading-refactor system-tests (was: consumer-threading-refactor) > Ensure system tests cover the "simple consumer + commit" use case > - > > Key: KAFKA-16109 > URL: https://issues.apache.org/jira/browse/KAFKA-16109 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16104: -- Component/s: (was: system tests) Labels: consumer-threading-refactor integration-tests (was: consumer-threading-refactor) > Enable additional PlaintextConsumerTest tests for new consumer > -- > > Key: KAFKA-16104 > URL: https://issues.apache.org/jira/browse/KAFKA-16104 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, integration-tests > Fix For: 3.8.0 > > > It should be possible to enable: > * testAutoCommitOnClose > * testAutoCommitOnCloseAfterWakeup > * testExpandingTopicSubscriptions > * testShrinkingTopicSubscriptions > * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed) > * testMultiConsumerSessionTimeoutOnStopPolling > * testAutoCommitOnRebalance > * testPerPartitionLeadMetricsCleanUpWithSubscribe > * testPerPartitionLagMetricsCleanUpWithSubscribe > * testStaticConsumerDetectsNewPartitionCreatedAfterRestart -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15993) Enable max poll integration tests that depend on callback invocation
[ https://issues.apache.org/jira/browse/KAFKA-15993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15993: -- Component/s: (was: system tests) Labels: consumer-threading-refactor integration-tests kip-848-preview timeout (was: consumer-threading-refactor kip-848-preview timeout) > Enable max poll integration tests that depend on callback invocation > > > Key: KAFKA-15993 > URL: https://issues.apache.org/jira/browse/KAFKA-15993 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, integration-tests, > kip-848-preview, timeout > Fix For: 3.8.0 > > > We will enable integration tests using the async consumer in KAFKA-15971. > However, we should also enable tests that rely on rebalance listeners after > KAFKA-15628 is closed. One example would be testMaxPollIntervalMs, that I > relies on the listener to verify the correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations
[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15848: -- Component/s: (was: system tests) Labels: consumer-threading-refactor integration-tests timeout (was: consumer-threading-refactor timeout) > Consumer API timeout inconsistent between ConsumerDelegate implementations > -- > > Key: KAFKA-15848 > URL: https://issues.apache.org/jira/browse/KAFKA-15848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and > {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their > use and interpretation of the {{Timer}} that is supplied. > h3. tl;dr > {{AsyncKafkaConsumer}} is very literal about the timeout, whereas > {{LegacyKafkaConsumer}} seems to give a little wiggle room. > {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for > success of its operations _before_ checking the timer: > # Submit operation asynchronously > # Wait for operation to complete using {{NetworkClient.poll()}} > # Check for result > ## If successful, return success > ## If fatal failure, return failure > # Check timer > ## If timer expired, return failure > {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: > # Submit operation asynchronously > # Wait for operation to complete using {{Future.get()}} > ## If operation timed out, {{Future.get()}} will throw a timeout error > # Check for result > ## If successful, return success > ## Otherwise, return failure > h3. How to reproduce > This causes subtle timing issues, but they can be easily reproduced via any > of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} > API. Here's a bit of code that illustrates the difference between the two > approaches. > {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a > manner similar to this: > {code:java} > public int getCount(Timer timer) { > do { > final RequestFuture future = sendSomeRequest(partitions); > client.poll(future, timer); > if (future.isDone()) > return future.get(); > } while (timer.notExpired()); > return -1; > } > {code} > {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: > {code:java} > private int getCount(Timer timer) { > try { > CompletableFuture future = new CompleteableFuture<>(); > applicationEventQueue.add(future); > return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); > } catch (TimeoutException e) { > return -1; > } > } > {code} > The call to {{add}} enqueues the network operation, but it then _immediately_ > invokes {{Future.get()}} with the timeout to implement a time-bounded > blocking call. Since this method is being called with a timeout of 0, it > _immediately_ throws a {{{}TimeoutException{}}}. > h3. Suggested fix > TBD :( -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding
[ https://issues.apache.org/jira/browse/KAFKA-16023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16023: -- Component/s: (was: system tests) Labels: consumer-threading-refactor integration-tests timeout (was: consumer-threading-refactor timeout) > PlaintextConsumerTest needs to wait for reconciliation to complete before > proceeding > > > Key: KAFKA-16023 > URL: https://issues.apache.org/jira/browse/KAFKA-16023 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > Several tests in PlaintextConsumerTest.scala (such as > testPerPartitionLagMetricsCleanUpWithSubscribe) uses: > assertEquals(1, listener.callsToAssigned, "should be assigned once") > However, as the timing for reconciliation completion is not deterministic due > to asynchronous processing. We actually need to wait until the condition to > happen. > However, another issue is the timeout - some of these tasks might not > complete within the 600ms timeout, so the tests are deemed to be flaky. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invoke and verify the consumer callback
[ https://issues.apache.org/jira/browse/KAFKA-16019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16019: -- Component/s: (was: system tests) Labels: consumer-threading-refactor integration-tests timeout (was: consumer-threading-refactor timeout) > Some of the tests in PlaintextConsumer can't seem to deterministically invoke > and verify the consumer callback > -- > > Key: KAFKA-16019 > URL: https://issues.apache.org/jira/browse/KAFKA-16019 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > I was running the PlaintextConsumer to test the async consumer; however, a > few tests were failing with not being able to verify the listener is invoked > correctly > For example `testPerPartitionLeadMetricsCleanUpWithSubscribe` > Around 50% of the time, the listener's callsToAssigned was never incremented > correctly. Event changing it to awaitUntilTrue it was still the same case > {code:java} > consumer.subscribe(List(topic, topic2).asJava, listener) > val records = awaitNonEmptyRecords(consumer, tp) > assertEquals(1, listener.callsToAssigned, "should be assigned once") {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
[ https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16010: -- Component/s: (was: system tests) Labels: consumer-threading-refactor integration-tests timeout (was: consumer-threading-refactor timeout) > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling > -- > > Key: KAFKA-16010 > URL: https://issues.apache.org/jira/browse/KAFKA-16010 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3] after one consumer left > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
[ https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16009: -- Component/s: (was: system tests) Labels: consumer-threading-refactor integration-tests timeout (was: consumer-threading-refactor timeout) > Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation > > > Key: KAFKA-16009 > URL: https://issues.apache.org/jira/browse/KAFKA-16009 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235) > {code} > The logs include this line: > > {code} > [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16008: -- Component/s: (was: system tests) Labels: consumer-threading-refactor integration-tests timeout (was: consumer-threading-refactor timeout) > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16134: -- Labels: consumer-threading-refactor integration-tests kip-848-client-support (was: consumer-threading-refactor kip-848-client-support) > kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer is flaky > -- > > Key: KAFKA-16134 > URL: https://issues.apache.org/jira/browse/KAFKA-16134 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Stanislav Kozlovski >Assignee: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > > The following test is very flaky. It failed 3 times consecutively in Jenkins > runs for the 3.7 release candidate. > kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16134: -- Component/s: (was: system tests) > kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer is flaky > -- > > Key: KAFKA-16134 > URL: https://issues.apache.org/jira/browse/KAFKA-16134 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Stanislav Kozlovski >Assignee: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The following test is very flaky. It failed 3 times consecutively in Jenkins > runs for the 3.7 release candidate. > kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16151: -- Labels: consumer-threading-refactor integration-tests kip-848-client-support (was: consumer-threading-refactor kip-848-client-support) > Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe > - > > Key: KAFKA-16151 > URL: https://issues.apache.org/jira/browse/KAFKA-16151 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16011) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose
[ https://issues.apache.org/jira/browse/KAFKA-16011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16011: -- Labels: integration-tests kip-848-client-support timeout (was: kip-848-client-support timeout) > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose > > > Key: KAFKA-16011 > URL: https://issues.apache.org/jira/browse/KAFKA-16011 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Phuc Hong Tran >Priority: Major > Labels: integration-tests, kip-848-client-support, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions HashSet(topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3). Instead, got ArrayBuffer(Set(topic1-0, topic-0, > topic-1), Set(), Set(topic1-1, topic1-5)) > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1865) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose(PlaintextConsumerTest.scala:1277) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:376) > [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Member JQ_e0S5FTzKnYyStB3aBrQ with epoch 0 transitioned to > FATAL state > (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:456) > [2023-12-13 15:33:33,212] ERROR [daemon-consumer-assignment]: Error due to > (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:139) > org.apache.kafka.common.errors.InvalidRequestException: RebalanceTimeoutMs > must be provided in first request. > [2023-12-13 15:33:39,196] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:33:39,200] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16150) Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16150: -- Labels: consumer-threading-refactor integration-tests kip-848-client-support (was: consumer-threading-refactor kip-848-client-support system-tests) > Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe > > > Key: KAFKA-16150 > URL: https://issues.apache.org/jira/browse/KAFKA-16150 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16135) kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16135: -- Labels: consumer-threading-refactor integration-tests kip-848-client-support (was: consumer-threading-refactor kip-848-client-support) > kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer is flaky > --- > > Key: KAFKA-16135 > URL: https://issues.apache.org/jira/browse/KAFKA-16135 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Stanislav Kozlovski >Assignee: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > > The test > kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer > is incredibly flaky - it failed 3 builds in a row for the 3.7 release > candidate, but with different JDK versions. Locally it also fails often and > requires a few retries to pass > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16150) Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16150: -- Component/s: (was: system tests) > Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe > > > Key: KAFKA-16150 > URL: https://issues.apache.org/jira/browse/KAFKA-16150 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16011) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose
[ https://issues.apache.org/jira/browse/KAFKA-16011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16011: -- Component/s: (was: system tests) > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose > > > Key: KAFKA-16011 > URL: https://issues.apache.org/jira/browse/KAFKA-16011 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Phuc Hong Tran >Priority: Major > Labels: integration-tests, kip-848-client-support, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions HashSet(topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3). Instead, got ArrayBuffer(Set(topic1-0, topic-0, > topic-1), Set(), Set(topic1-1, topic1-5)) > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1865) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose(PlaintextConsumerTest.scala:1277) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:376) > [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Member JQ_e0S5FTzKnYyStB3aBrQ with epoch 0 transitioned to > FATAL state > (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:456) > [2023-12-13 15:33:33,212] ERROR [daemon-consumer-assignment]: Error due to > (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:139) > org.apache.kafka.common.errors.InvalidRequestException: RebalanceTimeoutMs > must be provided in first request. > [2023-12-13 15:33:39,196] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:33:39,200] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16135) kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16135: -- Component/s: (was: system tests) > kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer is flaky > --- > > Key: KAFKA-16135 > URL: https://issues.apache.org/jira/browse/KAFKA-16135 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Stanislav Kozlovski >Assignee: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > > The test > kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer > is incredibly flaky - it failed 3 builds in a row for the 3.7 release > candidate, but with different JDK versions. Locally it also fails often and > requires a few retries to pass > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16152) Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart
[ https://issues.apache.org/jira/browse/KAFKA-16152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16152: -- Component/s: (was: system tests) > Fix > PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart > -- > > Key: KAFKA-16152 > URL: https://issues.apache.org/jira/browse/KAFKA-16152 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16151: -- Component/s: (was: system tests) > Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe > - > > Key: KAFKA-16151 > URL: https://issues.apache.org/jira/browse/KAFKA-16151 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16037) Upgrade existing system tests to use new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16037: -- Labels: kip-848-client-support system-tests (was: kip-848-client-support) > Upgrade existing system tests to use new consumer > - > > Key: KAFKA-16037 > URL: https://issues.apache.org/jira/browse/KAFKA-16037 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Dongnuo Lyu >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to parameterize the tests to run twice: both for the old and the > new Consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16152) Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart
[ https://issues.apache.org/jira/browse/KAFKA-16152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16152: -- Labels: consumer-threading-refactor integration-tests kip-848-client-support (was: consumer-threading-refactor kip-848-client-support) > Fix > PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart > -- > > Key: KAFKA-16152 > URL: https://issues.apache.org/jira/browse/KAFKA-16152 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15691) Add new system tests to use new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15691: -- Labels: kip-848-client-support system-tests (was: kip-848-client-support) > Add new system tests to use new consumer > > > Key: KAFKA-15691 > URL: https://issues.apache.org/jira/browse/KAFKA-15691 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [WIP] KAFKA-16192: Introduce usage of flexible records to coordinators [kafka]
jolshan opened a new pull request, #15303: URL: https://github.com/apache/kafka/pull/15303 Playing around with adding a transaction version feature WIP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]
showuon commented on PR #15273: URL: https://github.com/apache/kafka/pull/15273#issuecomment-1920457353 > @showuon I rebased and addressed your comments. Did not remove properties definitions from KafkaConfig, because this breaks a lot of tests. I think this should be done as part of KafkaConfig migration. In the end it is a small amount of duplication, but properties are always defined in one place. Thanks @fvaleri ! I had another look, and still think the duplication is not a great idea. It might cause potential issue someday, even if it looks good now. From what I can see, almost all the config validation exists in KafkaConfig, so could we remove the config definition in LogConfig? My imagination is the LogConfig can do something similar as RaftConfig, that has no config definition there. If you want to have a copy of the config value, you could pass in KafkaConfig instance into LogConfig [here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1513). Do you think that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
gaurav-narula commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1473758133 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -4912,7 +4919,9 @@ class ReplicaManagerTest { assertTrue(fooPart eq fooPart2) val bar1 = new TopicPartition("bar", 1) replicaManager.markPartitionOffline(bar1) - assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID)) + val (barPart, barNew) = replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID).get + assertTrue(barNew) + assertEquals(bar1, barPart.topicPartition) Review Comment: Addressed in [cdf9c0f](https://github.com/apache/kafka/pull/15263/commits/cdf9c0f0cf817e68ca27092addb21308a9bc3762) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-909 [kafka]
github-actions[bot] commented on PR #14691: URL: https://github.com/apache/kafka/pull/14691#issuecomment-1920440845 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16214) No user info when SASL authentication failure
Luke Chen created KAFKA-16214: - Summary: No user info when SASL authentication failure Key: KAFKA-16214 URL: https://issues.apache.org/jira/browse/KAFKA-16214 Project: Kafka Issue Type: Bug Affects Versions: 3.6.0 Reporter: Luke Chen Assignee: Luke Chen When client authenticate failed, the server will log with the client IP address only. The the IP address sometimes cannot represent a specific user, especially if there is proxy between client and server. Ex: {code:java} INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (channelId=127.0.0.1:9093-127.0.0.1:53223-5) (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector) {code} If there are many failed authentication log appeared in the server, it'd be better to identify who is triggering it soon. Adding the client info to the log is a good start. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
showuon commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1473746536 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -4912,7 +4919,9 @@ class ReplicaManagerTest { assertTrue(fooPart eq fooPart2) val bar1 = new TopicPartition("bar", 1) replicaManager.markPartitionOffline(bar1) - assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID)) + val (barPart, barNew) = replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID).get + assertTrue(barNew) + assertEquals(bar1, barPart.topicPartition) Review Comment: Could we add a test case for returning `None`? Since after this PR, we'll have 2 cases when creating partition in OfflinePartition, we should test them both. ## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ## @@ -3805,4 +3805,281 @@ class PartitionTest extends AbstractPartitionTest { when(kRaftMetadataCache.getAliveBrokerEpoch(broker)).thenReturn(Option(defaultBrokerEpoch(broker))) } } + + @ParameterizedTest + @ValueSource(booleans = Array(false, true)) + def makeLeaderInvokesgetOrCreateLog_OnOnlineLogDir(isNew: Boolean): Unit = { Review Comment: Thanks for adding the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16180: Fix UMR and LAIR handling during ZK migration [kafka]
mumrah commented on code in PR #15293: URL: https://github.com/apache/kafka/pull/15293#discussion_r1473671017 ## core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala: ## @@ -65,48 +65,84 @@ case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.L } object ZkMetadataCache { - /** - * Create topic deletions (leader=-2) for topics that are missing in a FULL UpdateMetadataRequest coming from a - * KRaft controller during a ZK migration. This will modify the UpdateMetadataRequest object passed into this method. - */ - def maybeInjectDeletedPartitionsFromFullMetadataRequest( + def transformKRaftControllerFullMetadataRequest( currentMetadata: MetadataSnapshot, requestControllerEpoch: Int, requestTopicStates: util.List[UpdateMetadataTopicState], - ): Seq[Uuid] = { -val prevTopicIds = currentMetadata.topicIds.values.toSet -val requestTopics = requestTopicStates.asScala.map { topicState => - topicState.topicName() -> topicState.topicId() -}.toMap - -val deleteTopics = prevTopicIds -- requestTopics.values.toSet -if (deleteTopics.isEmpty) { - return Seq.empty + ): (util.List[UpdateMetadataTopicState], util.List[String]) = { +val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]() +requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(), state)) +val logMessages = new util.ArrayList[String] +val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]() +currentMetadata.topicNames.forKeyValue((id, name) => { + Option(topicIdToNewState.get(id)) match { +case None => + currentMetadata.partitionStates.get(name) match { +case None => logMessages.add(s"Error: topic ${name} appeared in currentMetadata.topicNames, " + + "but not in currentMetadata.partitionStates.") +case Some(oldPartitionStates) => + logMessages.add(s"Removing topic ${name} with ID ${id} from the metadata cache since " + +"the full UMR did not include it.") + newRequestTopicStates.add(createDeletionEntries(name, +id, +oldPartitionStates.values, +requestControllerEpoch)) + } +case Some(newTopicState) => + val indexToState = new util.HashMap[Integer, UpdateMetadataPartitionState] + newTopicState.partitionStates().forEach(part => indexToState.put(part.partitionIndex, part)) + currentMetadata.partitionStates.get(name) match { +case None => logMessages.add(s"Error: topic ${name} appeared in currentMetadata.topicNames, " + + "but not in currentMetadata.partitionStates.") +case Some(oldPartitionStates) => + oldPartitionStates.foreach(state => indexToState.remove(state._1.toInt)) + if (!indexToState.isEmpty) { +logMessages.add(s"Removing ${indexToState.size()} partition(s) from topic ${name} with " + + s"ID ${id} from the metadata cache since the full UMR did not include them.") +newRequestTopicStates.add(createDeletionEntries(name, + id, + indexToState.values().asScala, + requestControllerEpoch)) + } + } + } +}) +if (newRequestTopicStates.isEmpty) { + // If the output is the same as the input, optimize by just returning the input. + (requestTopicStates, logMessages) +} else { + // If the output has some new entries, they should all appear at the beginning. This will + // ensure that the old stuff is cleared out before the new stuff is added. We will need a + // new list for this, of course. + newRequestTopicStates.addAll(requestTopicStates) + (newRequestTopicStates, logMessages) } + } -deleteTopics.foreach { deletedTopicId => - val topicName = currentMetadata.topicNames(deletedTopicId) Review Comment: I believe this was the source of the NoSuchElementException reported in the JIRA, is that right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12549) Allow state stores to opt-in transactional support
[ https://issues.apache.org/jira/browse/KAFKA-12549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812993#comment-17812993 ] Matthias J. Sax commented on KAFKA-12549: - [~high.lee] – this ticket is superceded by https://issues.apache.org/jira/browse/KAFKA-14412 which is already WIP. > Allow state stores to opt-in transactional support > -- > > Key: KAFKA-12549 > URL: https://issues.apache.org/jira/browse/KAFKA-12549 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Guozhang Wang >Priority: Major > > Right now Kafka Stream's EOS implementation does not make any assumptions > about the state store's transactional support. Allowing the state stores to > optionally provide transactional support can have multiple benefits. E.g., if > we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, > {{commitTxn}} and {{abortTxn}}. Streams library can determine if these are > supported via an additional {{boolean transactional()}} API, and if yes the > these APIs can be used under both ALOS and EOS like the following (otherwise > then just fallback to the normal processing logic): > Within thread processing loops: > 1. store.beginTxn > 2. store.put // during processing > 3. streams commit // either through eos protocol or not > 4. store.commitTxn > 5. start the next txn by store.beginTxn > If the state stores allow Streams to do something like above, we can have the > following benefits: > * Reduce the duplicated records upon crashes for ALOS (note this is not EOS > still, but some middle-ground where uncommitted data within a state store > would not be retained if store.commitTxn failed). > * No need to wipe the state store and re-bootstrap from scratch upon crashes > for EOS. E.g., if a crash-failure happened between streams commit completes > and store.commitTxn. We can instead just roll-forward the transaction by > replaying the changelog from the second recent streams committed offset > towards the most recent committed offset. > * Remote stores that support txn then do not need to support wiping > (https://issues.apache.org/jira/browse/KAFKA-12475). > * We can fix the known issues of emit-on-change > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams). > * We can support "query committed data only" for interactive queries (see > below for reasons). > As for the implementation of these APIs, there are several options: > * The state store itself have natural transaction features (e.g. RocksDB). > * Use an in-memory buffer for all puts within a transaction, and upon > `commitTxn` write the whole buffer as a batch to the underlying state store, > or just drop the whole buffer upon aborting. Then for interactive queries, > one can optionally only query the underlying store for committed data only. > * Use a separate store as the transient persistent buffer. Upon `beginTxn` > create a new empty transient store, and upon `commitTxn` merge the store into > the underlying store. Same applies for interactive querying committed-only > data. This has a benefit compared with the one above that there's no memory > pressure even with long transactions, but incurs more complexity / > performance overhead with the separate persistent store. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14683 Migrate #testStartPaused to Mockito [kafka]
gharris1727 commented on code in PR #14663: URL: https://github.com/apache/kafka/pull/14663#discussion_r1473603606 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java: ## @@ -1994,53 +1616,6 @@ private void expectInitializeTask() { PowerMock.expectLastCall(); } -private void expectRebalanceLossError(RuntimeException e) { -sinkTask.close(new HashSet<>(INITIAL_ASSIGNMENT)); -EasyMock.expectLastCall().andThrow(e); - - EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong(.andAnswer( -() -> { - rebalanceListener.getValue().onPartitionsLost(INITIAL_ASSIGNMENT); -return ConsumerRecords.empty(); -}); -} - -private void expectRebalanceRevocationError(RuntimeException e) { -sinkTask.close(new HashSet<>(INITIAL_ASSIGNMENT)); -EasyMock.expectLastCall().andThrow(e); - -sinkTask.preCommit(EasyMock.anyObject()); -EasyMock.expectLastCall().andReturn(Collections.emptyMap()); - - EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong(.andAnswer( -() -> { - rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT); -return ConsumerRecords.empty(); -}); -} - -private void expectRebalanceAssignmentError(RuntimeException e) { -sinkTask.close(INITIAL_ASSIGNMENT); -EasyMock.expectLastCall(); - -sinkTask.preCommit(EasyMock.anyObject()); -EasyMock.expectLastCall().andReturn(Collections.emptyMap()); - - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); - -sinkTask.open(INITIAL_ASSIGNMENT); -EasyMock.expectLastCall().andThrow(e); - - EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2); - EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong(.andAnswer( -() -> { - rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT); - rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT); -return ConsumerRecords.empty(); -}); -} - private void expectPollInitialAssignment() { Review Comment: I can't put the comment on the right line, but the next function `expectConsumerWakeup()` in this file is now unused. ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ## @@ -0,0 +1,675 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup; +import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; +import
Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]
mjsax commented on code in PR #14360: URL: https://github.com/apache/kafka/pull/14360#discussion_r1473438104 ## docs/streams/developer-guide/config-streams.html: ## @@ -1010,6 +1016,18 @@ topology.optimization + windowed.inner.class.serde + + + +Serde for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface. + + +Note that setting this config in KafkaStreams application would result in an error as it is meant to be used only from Plain consumer client. Review Comment: Thanks Sophie. > Note that this config is only used by plain consumer/producer clients For this case, why are we documenting is in KS docs -- should it not be in clients docs? (Also, this applies to `window.size.ms` introduced in 2.8 via https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size, right, but not to `windowed.inner.serde.class` which is a KS config added in 3.0 via https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177047930 In the end, KS module provides the window-serdes and thus it does not make sense to add `window.size.ms` to `ClientConfig` -- especially, as you pointed out, because is only necessary for console consumer. Thus, while `StreamsConfig#WINDOW_SIZE_MS_CONFIG` must exist as a variable name, I am wondering if it's actually correct that we added it as a StreamsConfig, ie, via `define(...)`? Mabye we should do a small KIP and remove it? -- For use, we should not mention `window.size.ms` in KS docs on the web-page (at least not for "top level config" -- we should either add it to a "windowed serde" section, or to (console) consumer config section where it belong to)? `window.inner.serde.class` is a KS config and should just be documented in the regular way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]
mjsax commented on code in PR #14360: URL: https://github.com/apache/kafka/pull/14360#discussion_r1473419962 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -482,7 +482,8 @@ public class StreamsConfig extends AbstractConfig { public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version"; private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the built-in metrics to use."; -/** {@code cache.max.bytes.buffering} */ +/** {@code cache.max.bytes.buffering} + * @deprecated since 3.4.0 Use cache.max.bytes instead with the cache.size metric at the DEBUG level. */ Review Comment: ```suggestion * @deprecated since 3.4.0 Use {@link #CACHE_MAX_BYTES_CONFIG "cache.max.bytes"} instead. */ ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -537,14 +538,16 @@ public class StreamsConfig extends AbstractConfig { public static final String ROCKS_DB = "rocksDB"; public static final String IN_MEMORY = "in_memory"; -/** {@code default.windowed.key.serde.inner} */ +/** {@code default.windowed.key.serde.inner + * @deprecated since 3.0.0 Use windowed.inner.class.serde instead.} */ Review Comment: ```suggestion * @deprecated since 3.0.0 Use {@link #WINDOWED_INNER_CLASS_SERDE "windowed.inner.class.serde"} instead.} */ ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -482,7 +482,8 @@ public class StreamsConfig extends AbstractConfig { public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version"; private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the built-in metrics to use."; -/** {@code cache.max.bytes.buffering} */ +/** {@code cache.max.bytes.buffering} + * @deprecated since 3.4.0 Use cache.max.bytes instead with the cache.size metric at the DEBUG level. */ Review Comment: Not sure if I understand the reference to `cache.size` metric? Can you elaborate? ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -537,14 +538,16 @@ public class StreamsConfig extends AbstractConfig { public static final String ROCKS_DB = "rocksDB"; public static final String IN_MEMORY = "in_memory"; -/** {@code default.windowed.key.serde.inner} */ +/** {@code default.windowed.key.serde.inner + * @deprecated since 3.0.0 Use windowed.inner.class.serde instead.} */ @SuppressWarnings("WeakerAccess") @Deprecated public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner"; private static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed key. Must implement the " + "org.apache.kafka.common.serialization.Serde interface."; -/** {@code default.windowed.value.serde.inner} */ +/** {@code default.windowed.value.serde.inner + * @deprecated since 3.0.0 Use windowed.inner.class.serde instead.} */ Review Comment: as above ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -620,7 +623,8 @@ public class StreamsConfig extends AbstractConfig { @SuppressWarnings("WeakerAccess") public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; -/** {@code auto.include.jmx.reporter} */ +/** {@code auto.include.jmx.reporter + * @deprecated and will removed in 4.0.0 Use org.apache.kafka.common.metrics.JmxReporter in metric.reporters instead.} */ Review Comment: as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]
philipnee commented on code in PR #15216: URL: https://github.com/apache/kafka/pull/15216#discussion_r1473330427 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.WindowedCount; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX; + +/** + * Base class for different consumer metrics to extend. This class helps to construct the logical group name from the + * given prefix and suffix, and provides a few common utilities. + * + * + * The suffix can be one of the following: + * + * -coordinator-metrics: {@link MetricGroupSuffix#COORDINATOR} + * -metrics: {@link MetricGroupSuffix#CONSUMER} + * + * + */ +public abstract class AbstractConsumerMetricsManager { Review Comment: See the latest commit - i removed this class to simplify the implementation a bit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16189; Extend admin to support ConsumerGroupDescribe API [kafka]
dajac commented on code in PR #15253: URL: https://github.com/apache/kafka/pull/15253#discussion_r1473247269 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java: ## @@ -158,36 +295,71 @@ public ApiResult handleResponse( return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); } +private Set convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) { +return assignment.topicPartitions().stream().flatMap(topic -> +topic.partitions().stream().map(partition -> +new TopicPartition(topic.topicName(), partition) +) +).collect(Collectors.toSet()); +} + private void handleError( CoordinatorKey groupId, Errors error, +String errorMsg, Map failed, -Set groupsToUnmap +Set groupsToUnmap, +boolean isConsumerGroupResponse ) { +String apiName = isConsumerGroupResponse ? "ConsumerGroupDescribe" : "DescribeGroups"; + switch (error) { case GROUP_AUTHORIZATION_FAILED: -log.debug("`DescribeGroups` request for group id {} failed due to error {}", groupId.idValue, error); -failed.put(groupId, error.exception()); +log.debug("`{}` request for group id {} failed due to error {}.", apiName, groupId.idValue, error); +failed.put(groupId, error.exception(errorMsg)); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry -log.debug("`DescribeGroups` request for group id {} failed because the coordinator " + -"is still in the process of loading state. Will retry", groupId.idValue); +log.debug("`{}` request for group id {} failed because the coordinator " + +"is still in the process of loading state. Will retry.", apiName, groupId.idValue); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request -log.debug("`DescribeGroups` request for group id {} returned error {}. " + -"Will attempt to find the coordinator again and retry", groupId.idValue, error); +log.debug("`{}` request for group id {} returned error {}. " + +"Will attempt to find the coordinator again and retry.", apiName, groupId.idValue, error); groupsToUnmap.add(groupId); break; +case UNSUPPORTED_VERSION: +if (isConsumerGroupResponse) { +log.debug("`{}` request for group id {} failed because the API is not " + +"supported. Will retry with `DescribeGroups` API.", apiName, groupId.idValue); +useClassicGroupApi.add(groupId.idValue); +} else { +log.error("`{}` request for group id {} because the `ConsumerGroupDescribe` API is not supported.", +apiName, groupId.idValue); +failed.put(groupId, error.exception(errorMsg)); +} +break; + +case GROUP_ID_NOT_FOUND: +if (isConsumerGroupResponse) { +log.debug("`{}` request for group id {} failed because the group is not " + +"a new consumer group. Will retry with `DescribeGroups` API.", apiName, groupId.idValue); +useClassicGroupApi.add(groupId.idValue); +} else { +log.error("`{}` request for group id {} because the group does not exist.", apiName, groupId.idValue); Review Comment: Good catch! Let me fix this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change Connect integration StopStartLatch::await to throw TimeoutException on timeout [kafka]
C0urante closed pull request #15178: MINOR: Change Connect integration StopStartLatch::await to throw TimeoutException on timeout URL: https://github.com/apache/kafka/pull/15178 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change Connect integration StopStartLatch::await to throw TimeoutException on timeout [kafka]
C0urante commented on PR #15178: URL: https://github.com/apache/kafka/pull/15178#issuecomment-1919630582 This change wasn't really that valuable; the only information it added was already trivial and what one would expect to see (e.g., testing code that waited for a connector to be stopped once would report that it was still waiting for the connector to be stopped once--not exactly jaw-dropping insight). I've instead begun debugging our tests with full logs enabled on https://github.com/apache/kafka/pull/15229, which has proven much more useful. Closing this PR as it's not worth the review bandwidth. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15524, KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]
C0urante opened a new pull request, #15302: URL: https://github.com/apache/kafka/pull/15302 [Jira 1](https://issues.apache.org/jira/browse/KAFKA-15524), [Jira 2](https://issues.apache.org/jira/browse/KAFKA-15917) In some rare cases, our integration tests observe that tasks are up and running for our sink connectors, and issue a subsequent request to stop the connector, before the consumers of these tasks have had a chance to form/join a group. When this happens, requests to alter/reset the offsets for the connector actually succeed, since the group either doesn't exist, or is completely empty. As a quick fix, we can wait for offsets to be committed for the connector before stopping it and attempting to modify its offsets. This goes a bit further than necessary (we should only have to wait for at least one task's consumer to have formed/joined a group), but off the top of my head it was the cleanest and briefest way to guarantee that a group had been formed. It also makes a little more sense in the context of the test, since there's not much use in modifying connector offsets when none exist yet. Opening as a draft; will wait for CI to complete before marking ready for review. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16189; Extend admin to support ConsumerGroupDescribe API [kafka]
AndrewJSchofield commented on code in PR #15253: URL: https://github.com/apache/kafka/pull/15253#discussion_r1473234152 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java: ## @@ -158,36 +295,71 @@ public ApiResult handleResponse( return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); } +private Set convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) { +return assignment.topicPartitions().stream().flatMap(topic -> +topic.partitions().stream().map(partition -> +new TopicPartition(topic.topicName(), partition) +) +).collect(Collectors.toSet()); +} + private void handleError( CoordinatorKey groupId, Errors error, +String errorMsg, Map failed, -Set groupsToUnmap +Set groupsToUnmap, +boolean isConsumerGroupResponse ) { +String apiName = isConsumerGroupResponse ? "ConsumerGroupDescribe" : "DescribeGroups"; + switch (error) { case GROUP_AUTHORIZATION_FAILED: -log.debug("`DescribeGroups` request for group id {} failed due to error {}", groupId.idValue, error); -failed.put(groupId, error.exception()); +log.debug("`{}` request for group id {} failed due to error {}.", apiName, groupId.idValue, error); +failed.put(groupId, error.exception(errorMsg)); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry -log.debug("`DescribeGroups` request for group id {} failed because the coordinator " + -"is still in the process of loading state. Will retry", groupId.idValue); +log.debug("`{}` request for group id {} failed because the coordinator " + +"is still in the process of loading state. Will retry.", apiName, groupId.idValue); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request -log.debug("`DescribeGroups` request for group id {} returned error {}. " + -"Will attempt to find the coordinator again and retry", groupId.idValue, error); +log.debug("`{}` request for group id {} returned error {}. " + +"Will attempt to find the coordinator again and retry.", apiName, groupId.idValue, error); groupsToUnmap.add(groupId); break; +case UNSUPPORTED_VERSION: +if (isConsumerGroupResponse) { +log.debug("`{}` request for group id {} failed because the API is not " + +"supported. Will retry with `DescribeGroups` API.", apiName, groupId.idValue); +useClassicGroupApi.add(groupId.idValue); +} else { +log.error("`{}` request for group id {} because the `ConsumerGroupDescribe` API is not supported.", +apiName, groupId.idValue); +failed.put(groupId, error.exception(errorMsg)); +} +break; + +case GROUP_ID_NOT_FOUND: +if (isConsumerGroupResponse) { +log.debug("`{}` request for group id {} failed because the group is not " + +"a new consumer group. Will retry with `DescribeGroups` API.", apiName, groupId.idValue); +useClassicGroupApi.add(groupId.idValue); +} else { +log.error("`{}` request for group id {} because the group does not exist.", apiName, groupId.idValue); Review Comment: This doesn't seem grammatical. It would say "DescribeGroups request for group id {} because the group does not exist". I suggest "failed" or something like that is missing. ## clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java: ## @@ -158,36 +295,71 @@ public ApiResult handleResponse( return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); } +private Set convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) { +return assignment.topicPartitions().stream().flatMap(topic -> +topic.partitions().stream().map(partition -> +new TopicPartition(topic.topicName(), partition) +) +).collect(Collectors.toSet()); +} + private void handleError( CoordinatorKey groupId, Errors error, +String errorMsg, Map failed, -Set groupsToUnmap +
[jira] [Commented] (KAFKA-16212) Cache partitions by TopicIdPartition instead of TopicPartition
[ https://issues.apache.org/jira/browse/KAFKA-16212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812824#comment-17812824 ] Lianet Magrans commented on KAFKA-16212: Similar situation and approach on the client side btw [~jolshan]. With the new consumer and KIP-848 we're moving in the direction of internally spreading the use of topic IDs more...step by step. > Cache partitions by TopicIdPartition instead of TopicPartition > -- > > Key: KAFKA-16212 > URL: https://issues.apache.org/jira/browse/KAFKA-16212 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Major > > From the discussion in [PR > 15263|https://github.com/apache/kafka/pull/15263#discussion_r1471075201], it > would be better to cache {{allPartitions}} by {{TopicIdPartition}} instead of > {{TopicPartition}} to avoid ambiguity. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16213) KRaft should honor voterId outside of VotedState
José Armando García Sancio created KAFKA-16213: -- Summary: KRaft should honor voterId outside of VotedState Key: KAFKA-16213 URL: https://issues.apache.org/jira/browse/KAFKA-16213 Project: Kafka Issue Type: Improvement Components: kraft Reporter: José Armando García Sancio Assignee: José Armando García Sancio The current implementation of KRaft only stores the id of the replica for which it voted when it is in the VotedState. When it transitions to other states like Follower, Leader, Resigned, etc. it doesn't continue to remember and persist the replica id of the replica for which it voted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16168; Implement GroupCoordinator.onPartitionsDeleted [kafka]
jolshan commented on code in PR #15237: URL: https://github.com/apache/kafka/pull/15237#discussion_r1473219011 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -903,6 +906,45 @@ public boolean cleanupExpiredOffsets(String groupId, List records) { return allOffsetsExpired.get() && !openTransactionsByGroup.containsKey(groupId); } +/** + * Remove offsets of the partitions that have been deleted. + * + * @param topicPartitions The partitions that have been deleted. + * @return The list of tombstones (offset commit) to append. + */ +public List onPartitionsDeleted( Review Comment: Thanks for the thorough response. Makes sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]
gaurav-narula commented on code in PR #15262: URL: https://github.com/apache/kafka/pull/15262#discussion_r1473210077 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2586,7 +2586,7 @@ class ReplicaManager(val config: KafkaConfig, // retrieve the UUID here because logManager.handleLogDirFailure handler removes it val uuid = logManager.directoryId(dir) logManager.handleLogDirFailure(dir) -if (dir == config.metadataLogDir) { +if (dir == new File(config.metadataLogDir).getAbsolutePath && (zkClient.isEmpty || config.migrationEnabled)) { Review Comment: Addressed in https://github.com/apache/kafka/pull/15262/commits/b28e21a5f96d580e4463e20eb73738a157eb440c -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16212) Cache partitions by TopicIdPartition instead of TopicPartition
[ https://issues.apache.org/jira/browse/KAFKA-16212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812822#comment-17812822 ] Justine Olshan commented on KAFKA-16212: When working on KIP-516 (topic IDs) there were many areas that topic IDs could help but changing them all in one go would have been difficult. I'm happy to see more steps in this direction :) > Cache partitions by TopicIdPartition instead of TopicPartition > -- > > Key: KAFKA-16212 > URL: https://issues.apache.org/jira/browse/KAFKA-16212 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Major > > From the discussion in [PR > 15263|https://github.com/apache/kafka/pull/15263#discussion_r1471075201], it > would be better to cache {{allPartitions}} by {{TopicIdPartition}} instead of > {{TopicPartition}} to avoid ambiguity. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15524) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks
[ https://issues.apache.org/jira/browse/KAFKA-15524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-15524: - Assignee: Chris Egerton > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks > -- > > Key: KAFKA-15524 > URL: https://issues.apache.org/jira/browse/KAFKA-15524 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.6.0, 3.5.1 >Reporter: Josep Prat >Assignee: Chris Egerton >Priority: Major > Labels: flaky, flaky-test > > Last seen: > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/] > > h3. Error Message > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out.{code} > h3. Stacktrace > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out. at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:401) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:392) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:763) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at >
[jira] [Assigned] (KAFKA-15917) Flaky test - OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks
[ https://issues.apache.org/jira/browse/KAFKA-15917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-15917: - Assignee: Chris Egerton > Flaky test - > OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks > --- > > Key: KAFKA-15917 > URL: https://issues.apache.org/jira/browse/KAFKA-15917 > Project: Kafka > Issue Type: Bug >Reporter: Haruki Okada >Assignee: Chris Egerton >Priority: Major > Labels: flaky-test > Attachments: stdout.log > > > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14242/14/tests/] > > > {code:java} > Error > java.lang.AssertionError: > Expected: a string containing "zombie sink task" > but: was "Could not alter connector offsets. Error response: > {"error_code":500,"message":"Failed to alter consumer group offsets for > connector test-connector"}" > Stacktrace > java.lang.AssertionError: > Expected: a string containing "zombie sink task" > but: was "Could not alter connector offsets. Error response: > {"error_code":500,"message":"Failed to alter consumer group offsets for > connector test-connector"}" > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:431) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at >
Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]
cmccabe commented on code in PR #15262: URL: https://github.com/apache/kafka/pull/15262#discussion_r1473174695 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2586,7 +2586,7 @@ class ReplicaManager(val config: KafkaConfig, // retrieve the UUID here because logManager.handleLogDirFailure handler removes it val uuid = logManager.directoryId(dir) logManager.handleLogDirFailure(dir) -if (dir == config.metadataLogDir) { +if (dir == new File(config.metadataLogDir).getAbsolutePath && (zkClient.isEmpty || config.migrationEnabled)) { Review Comment: This is not quite the correct check... you should check `config.processRoles` (probably `config.processRoles.isNotEmpty || config.migrationEnabled` ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2586,7 +2586,7 @@ class ReplicaManager(val config: KafkaConfig, // retrieve the UUID here because logManager.handleLogDirFailure handler removes it val uuid = logManager.directoryId(dir) logManager.handleLogDirFailure(dir) -if (dir == config.metadataLogDir) { +if (dir == new File(config.metadataLogDir).getAbsolutePath && (zkClient.isEmpty || config.migrationEnabled)) { Review Comment: This is not quite the correct check... you should check `config.processRoles` (probably `config.processRoles.isNotEmpty || config.migrationEnabled` ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]
cmccabe commented on code in PR #15262: URL: https://github.com/apache/kafka/pull/15262#discussion_r1473174695 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2586,7 +2586,7 @@ class ReplicaManager(val config: KafkaConfig, // retrieve the UUID here because logManager.handleLogDirFailure handler removes it val uuid = logManager.directoryId(dir) logManager.handleLogDirFailure(dir) -if (dir == config.metadataLogDir) { +if (dir == new File(config.metadataLogDir).getAbsolutePath && (zkClient.isEmpty || config.migrationEnabled)) { Review Comment: This is not the correct check... you should check `config.processRoles` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]
nicktelford commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1473168658 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## Review Comment: Yeah, I can do that. I'm a little concerned that we might forget to move it back though! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]
nicktelford commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1473167769 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1836,6 +1841,33 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } +// track the size of the transaction buffer on each iteration to predict when it will be exceeded in advance +private long lastUncommittedBytes = 0L; + +boolean needsCommit(final boolean updateDelta) { +if (maxUncommittedStateBytes < 0) { +// if our transaction buffers are unbounded, we never need to force an early commit +return false; +} + +// force an early commit if the uncommitted bytes exceeds or is *likely to exceed* the configured threshold +final long uncommittedBytes = tasks.approximateUncommittedStateBytes(); + +final long deltaBytes = Math.max(0, uncommittedBytes - lastUncommittedBytes); + +final boolean needsCommit = maxUncommittedStateBytes > -1 && uncommittedBytes + deltaBytes > maxUncommittedStateBytes; Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]
nicktelford commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1473167034 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1836,6 +1841,33 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } +// track the size of the transaction buffer on each iteration to predict when it will be exceeded in advance +private long lastUncommittedBytes = 0L; + +boolean needsCommit(final boolean updateDelta) { +if (maxUncommittedStateBytes < 0) { +// if our transaction buffers are unbounded, we never need to force an early commit +return false; +} + +// force an early commit if the uncommitted bytes exceeds or is *likely to exceed* the configured threshold +final long uncommittedBytes = tasks.approximateUncommittedStateBytes(); + +final long deltaBytes = Math.max(0, uncommittedBytes - lastUncommittedBytes); + +final boolean needsCommit = maxUncommittedStateBytes > -1 && uncommittedBytes + deltaBytes > maxUncommittedStateBytes; Review Comment: Yep! I did some refactoring and didn't spot the dead code :smile: ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1836,6 +1841,33 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } +// track the size of the transaction buffer on each iteration to predict when it will be exceeded in advance +private long lastUncommittedBytes = 0L; + +boolean needsCommit(final boolean updateDelta) { +if (maxUncommittedStateBytes < 0) { +// if our transaction buffers are unbounded, we never need to force an early commit +return false; +} + +// force an early commit if the uncommitted bytes exceeds or is *likely to exceed* the configured threshold +final long uncommittedBytes = tasks.approximateUncommittedStateBytes(); + +final long deltaBytes = Math.max(0, uncommittedBytes - lastUncommittedBytes); + +final boolean needsCommit = maxUncommittedStateBytes > -1 && uncommittedBytes + deltaBytes > maxUncommittedStateBytes; Review Comment: Woops! I did some refactoring and didn't spot the dead code :smile: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]
nicktelford commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1473166531 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1836,6 +1841,33 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } +// track the size of the transaction buffer on each iteration to predict when it will be exceeded in advance +private long lastUncommittedBytes = 0L; + +boolean needsCommit(final boolean updateDelta) { +if (maxUncommittedStateBytes < 0) { +// if our transaction buffers are unbounded, we never need to force an early commit +return false; +} Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]
nicktelford commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1473165230 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1836,6 +1841,33 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } +// track the size of the transaction buffer on each iteration to predict when it will be exceeded in advance +private long lastUncommittedBytes = 0L; Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]
philipnee commented on code in PR #15216: URL: https://github.com/apache/kafka/pull/15216#discussion_r1473160823 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.WindowedCount; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX; + +/** + * Base class for different consumer metrics to extend. This class helps to construct the logical group name from the + * given prefix and suffix, and provides a few common utilities. + * + * + * The suffix can be one of the following: + * + * -coordinator-metrics: {@link MetricGroupSuffix#COORDINATOR} + * -metrics: {@link MetricGroupSuffix#CONSUMER} + * + * + */ +public abstract class AbstractConsumerMetricsManager { Review Comment: are you suggesting making this a utility class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]
nicktelford commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1473153338 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1097,10 +1102,10 @@ void handleRevocation(final Collection revokedPartitions) { for (final Task task : commitNeededActiveTasks) { if (!dirtyTasks.contains(task)) { try { -// for non-revoking active tasks, we should not enforce checkpoint -// since if it is EOS enabled, no checkpoint should be written while -// the task is in RUNNING tate -task.postCommit(false); +// we only enforce a checkpoint if the transaction buffers are full +// to avoid unnecessary flushing of stores under EOS +final boolean enforceCheckpoint = maxUncommittedStateBytes > -1 && tasks.approximateUncommittedStateBytes() >= maxUncommittedStateBytes; +task.postCommit(enforceCheckpoint); Review Comment: Same as above. That said, I'm not convinced this particular change is necessary. This code is run during rebalance, and AFAIK no records will be processing during a rebalance; it shouldn't be possible for the transaction buffer to have increased beyond the last time we checked in the `StreamThread` main run-loop (aka. `StreamThread#maybeCommit`). Worst case scenario, if we leave this as `task.postCommit(false)`, and the state stores aren't flushed during a rebalance, if the transaction buffer is already close to exceeding its capacity, it will be detected during the next iteration of the run-loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16212) Cache partitions by TopicIdPartition instead of TopicPartition
[ https://issues.apache.org/jira/browse/KAFKA-16212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim reassigned KAFKA-16212: - Assignee: Omnia Ibrahim > Cache partitions by TopicIdPartition instead of TopicPartition > -- > > Key: KAFKA-16212 > URL: https://issues.apache.org/jira/browse/KAFKA-16212 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Major > > From the discussion in [PR > 15263|https://github.com/apache/kafka/pull/15263#discussion_r1471075201], it > would be better to cache {{allPartitions}} by {{TopicIdPartition}} instead of > {{TopicPartition}} to avoid ambiguity. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]
nicktelford commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1473141072 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java: ## @@ -157,7 +158,8 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final Collection tasksToCo if (task.commitNeeded()) { task.clearTaskTimeout(); ++committed; -task.postCommit(false); +// under EOS, we need to enforce a checkpoint if our transaction buffers will exceeded their capacity Review Comment: Actually, at this point we've only committed our Kafka transaction. The StateStore has not been committed/flushed, so our transaction buffers will still be full. `task.postCommit` is the process that "checkpoints" state stores. We need to enforce a checkpoint when the transaction buffer is full, because we only flush the StateStore when it is checkpointed. See `AbstractTask` line `99`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]
nicktelford commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1473126369 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -1374,7 +1376,7 @@ public void signalResume() { */ int maybeCommit() { final int committed; -if (now - lastCommitMs > commitTimeMs) { +if (taskManager.needsCommit(true) || now - lastCommitMs > commitTimeMs) { Review Comment: I've added a test for this in 0577905, but it feels like a bit of a tautology: it essentially just tests that `commit()` is called when `taskManager.needsCommit(anyBoolean())` returns `true`. The real meat of this will be in the `TaskManagerTests`, which I'm working on separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1919475280 Thanks for the in-depth analysis! I think part of the problem here stems from trying to make our internal `Future`-based API cooperate with Java's `CompleteableFuture` API. I've sketched out something below that doesn't rely on `CompletableFuture` and (I believe) preserves the semantics we want for asynchronicity: ```java public class ConnectorOffsetBackingStore implements OffsetBackingStore { @Override public Future set(Map values, Callback callback) { final OffsetBackingStore primaryStore; final OffsetBackingStore secondaryStore; if (connectorStore.isPresent()) { primaryStore = connectorStore.get(); secondaryStore = workerStore.orElse(null); } else if (workerStore.isPresent()) { primaryStore = workerStore.get(); secondaryStore = null; } else { // Should never happen since we check for this case in the constructor, but just in case, this should // be more informative than the NPE that would otherwise be thrown throw new IllegalStateException("At least one non-null offset store must be provided"); } Map regularOffsets = new HashMap<>(); Map tombstoneOffsets = new HashMap<>(); values.forEach((partition, offset) -> { if (offset == null) { tombstoneOffsets.put(partition, null); } else { regularOffsets.put(partition, offset); } }); if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { return secondaryStore.set(tombstoneOffsets, (tombstoneWriteError, ignored) -> { if (tombstoneWriteError != null) { log.trace("Skipping offsets write to primary store because secondary tombstone write has failed", tombstoneWriteError); try (LoggingContext context = loggingContext()) { callback.onCompletion(tombstoneWriteError, ignored); } return; } setPrimaryThenSecondary(primaryStore, secondaryStore, values, regularOffsets, callback); }); } else { return setPrimaryThenSecondary(primaryStore, secondaryStore, values, regularOffsets, callback); } } private Future setPrimaryThenSecondary( OffsetBackingStore primaryStore, OffsetBackingStore secondaryStore, Map completeOffsets, Map nonTombstoneOffsets, Callback callback ) { return primaryStore.set(completeOffsets, (primaryWriteError, ignored) -> { if (secondaryStore != null) { if (primaryWriteError != null) { log.trace("Skipping offsets write to secondary store because primary write has failed", primaryWriteError); } else { try { // Invoke OffsetBackingStore::set but ignore the resulting future; we don't block on writes to this // backing store. secondaryStore.set(nonTombstoneOffsets, (secondaryWriteError, ignored2) -> { try (LoggingContext context = loggingContext()) { if (secondaryWriteError != null) { log.warn("Failed to write offsets to secondary backing store", secondaryWriteError); } else { log.debug("Successfully flushed offsets to secondary backing store"); } } }); } catch (Exception e) { log.warn("Failed to write offsets to secondary backing store", e); } } } try (LoggingContext context = loggingContext()) { callback.onCompletion(primaryWriteError, ignored); } }); } } ``` It also obviates the need for the `exactlyOnce` and `offsetFlushTimeoutMs` fields. If this looks acceptable, I think the only question left is whether out-of-order writes are possible because of how things are chained. I believe this is only a problem for non-exactly-once source tasks (since we only have at most one in-flight offset commit at a time when exactly-once support is enabled), and should be handled gracefully by `OffsetStorageWriter::cancelFlush`, but it'd be nice to have a second pair of eyes to make sure. -- This is an automated
Re: [PR] MINOR: Clean up core metrics, migration, network, raft, security, serializer, tools, utils, and zookeeper modules [kafka]
jlprat commented on code in PR #15279: URL: https://github.com/apache/kafka/pull/15279#discussion_r1473125018 ## core/src/main/scala/kafka/network/RequestChannel.scala: ## @@ -92,18 +92,18 @@ object RequestChannel extends Logging { val envelope: Option[RequestChannel.Request] = None) extends BaseRequest { // These need to be volatile because the readers are in the network thread and the writers are in the request // handler threads or the purgatory threads -@volatile var requestDequeueTimeNanos = -1L -@volatile var apiLocalCompleteTimeNanos = -1L -@volatile var responseCompleteTimeNanos = -1L -@volatile var responseDequeueTimeNanos = -1L -@volatile var messageConversionsTimeNanos = 0L -@volatile var apiThrottleTimeMs = 0L -@volatile var temporaryMemoryBytes = 0L +@volatile var requestDequeueTimeNanos: Long = -1L +@volatile var apiLocalCompleteTimeNanos: Long = -1L +@volatile var responseCompleteTimeNanos: Long = -1L +@volatile var responseDequeueTimeNanos: Long = -1L +@volatile var messageConversionsTimeNanos: Long = 0L +@volatile var apiThrottleTimeMs: Long = 0L +@volatile var temporaryMemoryBytes: Long = 0L @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None @volatile var callbackRequestDequeueTimeNanos: Option[Long] = None @volatile var callbackRequestCompleteTimeNanos: Option[Long] = None -val session = Session(context.principal, context.clientAddress) +val session: Session = Session(context.principal, context.clientAddress) Review Comment: The changes made in this PR reflect the current (as per 2.12 and 2.13) idiomatic way to write Scala. For example: Boolean parameters should be named, idempotent methods that are parameterless can be omit the parenthesis, side-effecting parameterless methods should include the parenthesis, and so on -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org