[jira] [Resolved] (KAFKA-16101) KRaft migration rollback documentation is incorrect
[ https://issues.apache.org/jira/browse/KAFKA-16101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16101. --- Resolution: Fixed > KRaft migration rollback documentation is incorrect > --- > > Key: KAFKA-16101 > URL: https://issues.apache.org/jira/browse/KAFKA-16101 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.6.1 >Reporter: Paolo Patierno >Assignee: Colin McCabe >Priority: Blocker > Fix For: 3.7.0 > > > Hello, > I was trying the KRaft migration rollback procedure locally and I came across > a potential bug or anyway a situation where the cluster is not > usable/available for a certain amount of time. > In order to test the procedure, I start with a one broker (broker ID = 0) and > one zookeeper node cluster. Then I start the migration with a one KRaft > controller node (broker ID = 1). The migration runs fine and it reaches the > point of "dual write" state. > From this point, I try to run the rollback procedure as described in the > documentation. > As first step, this involves ... > * stopping the broker > * removing the __cluster_metadata folder > * removing ZooKeeper migration flag and controller(s) related configuration > from the broker > * restarting the broker > With the above steps done, the broker starts in ZooKeeper mode (no migration, > no KRaft controllers knowledge) and it keeps logging the following messages > in DEBUG: > {code:java} > [2024-01-08 11:51:20,608] DEBUG > [zk-broker-0-to-controller-forwarding-channel-manager]: Controller isn't > cached, looking for local metadata changes > (kafka.server.BrokerToControllerRequestThread) > [2024-01-08 11:51:20,608] DEBUG > [zk-broker-0-to-controller-forwarding-channel-manager]: No controller > provided, retrying after backoff > (kafka.server.BrokerToControllerRequestThread) > [2024-01-08 11:51:20,629] DEBUG > [zk-broker-0-to-controller-alter-partition-channel-manager]: Controller isn't > cached, looking for local metadata changes > (kafka.server.BrokerToControllerRequestThread) > [2024-01-08 11:51:20,629] DEBUG > [zk-broker-0-to-controller-alter-partition-channel-manager]: No controller > provided, retrying after backoff > (kafka.server.BrokerToControllerRequestThread) {code} > What's happening should be clear. > The /controller znode in ZooKeeper still reports the KRaft controller (broker > ID = 1) as controller. The broker gets it from the znode but doesn't know how > to reach it. > The issue is that until the procedure isn't fully completed with the next > steps (shutting down KRaft controller, deleting /controller znode), the > cluster is unusable. Any admin or client operation against the broker doesn't > work, just hangs, the broker doesn't reply. > Imagining this scenario to a more complex one with 10-20-50 brokers and > partitions' replicas spread across them, when the brokers are rolled one by > one (in ZK mode) reporting the above error, the topics will become not > available one after the other, until all brokers are in such a state and > nothing can work. This is because from a KRaft controller perspective (still > running), the brokers are not available anymore and the partitions' replicas > are out of sync. > Of course, as soon as you complete the rollback procedure, after deleting the > /controller znode, the brokers are able to elect a new controller among them > and everything recovers to work. > My first question ... isn't the cluster supposed to work during rollback and > being always available during the rollback when the procedure is not > completed yet? Or having the cluster not available is an assumption during > the rollback, until it's fully completed? > This "unavailability" time window could be reduced by deleting the > /controller znode before shutting down the KRaft controllers to allow the > brokers electing a new controller among them, but in this case, could there > be a race condition where KRaft controllers still running could steal > leadership again? > Or is there anything missing in the documentation maybe which is driving to > this problem? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16037) Upgrade existing system tests to use new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16037: - Assignee: Dongnuo Lyu (was: Kirk True) > Upgrade existing system tests to use new consumer > - > > Key: KAFKA-16037 > URL: https://issues.apache.org/jira/browse/KAFKA-16037 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Dongnuo Lyu >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > This task is to parameterize the tests to run twice: both for the old and the > new Consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16208) Design new Consumer timeout policy
[ https://issues.apache.org/jira/browse/KAFKA-16208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16208: -- Priority: Blocker (was: Critical) > Design new Consumer timeout policy > -- > > Key: KAFKA-16208 > URL: https://issues.apache.org/jira/browse/KAFKA-16208 > Project: Kafka > Issue Type: Task > Components: clients, consumer, documentation >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > This task is to design and document the timeout policy for the new Consumer > implementation. > The documentation lives here: > https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID
[ https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15283: -- Priority: Major (was: Minor) > Client support for OffsetFetch and OffsetCommit with topic ID > - > > Key: KAFKA-15283 > URL: https://issues.apache.org/jira/browse/KAFKA-15283 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, newbie, offset > Fix For: 3.8.0 > > > Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory > {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and > {{METADATA}} RPC calls. > With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in > the same way, so the new client implementation will provide it when issuing > those requests. Topic names should continue to be supported as needed by the > {{{}AdminClient{}}}. > We should also review/clean-up the support for topic names in requests such > as the {{METADATA}} request (currently supporting topic names as well as > topic IDs on the client side). > Tasks include: > * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will > be upgraded on the server to support topic ID > * Check topic ID propagation internally in the client based on RPCs > including it. > * Review existing support for topic name for potential clean if not needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps
[ https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16156: -- Priority: Blocker (was: Major) > System test failing for new consumer on endOffsets with negative timestamps > --- > > Key: KAFKA-16156 > URL: https://issues.apache.org/jira/browse/KAFKA-16156 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid > negative timestamp". > Trace: > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Received ListOffsetResponse > ListOffsetsResponseData(throttleTimeMs=0, > topics=[ListOffsetsTopicResponse(name='input-topic', > partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, > oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from > broker worker2:9092 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,932] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Handling ListOffsetResponse > response for input-topic-0. Fetched offset 42804, timestamp -1 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Updating last stable offset for > partition input-topic-0 to 42804 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,933] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Fetch offsets completed > successfully for partitions and timestamps {input-topic-0=-1}. Result > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862 > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,933] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] No events to process > (org.apache.kafka.clients.consumer.internals.events.EventProcessor) > [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event > loop (org.apache.kafka.tools.TransactionalMessageCopier) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Invalid negative timestamp > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) > at > org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) > at > org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) > at > org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) > at > org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) > Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp > at > org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39) > at > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253) > at > org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.j
[jira] [Updated] (KAFKA-16178) AsyncKafkaConsumer doesn't retry joining the group after rediscovering group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-16178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16178: -- Priority: Blocker (was: Critical) > AsyncKafkaConsumer doesn't retry joining the group after rediscovering group > coordinator > > > Key: KAFKA-16178 > URL: https://issues.apache.org/jira/browse/KAFKA-16178 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Dongnuo Lyu >Assignee: Philip Nee >Priority: Blocker > Labels: client-transitions-issues, consumer-threading-refactor > Fix For: 3.8.0 > > Attachments: pkc-devc63jwnj_jan19_0_debug > > > {code:java} > [2024-01-17 21:34:59,500] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] Discovered group coordinator > Coordinator(key='consumer-groups-test-0', nodeId=3, > host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, > errorCode=0, errorMessage='') > (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162) > [2024-01-17 21:34:59,681] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] GroupHeartbeatRequest failed because the > group coordinator > Optional[b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: > 2147483644 rack: null)] is incorrect. Will attempt to find the coordinator > again and retry in 0ms: This is not the correct coordinator. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:407) > [2024-01-17 21:34:59,681] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] Group coordinator > b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483644 rack: > null) is unavailable or invalid due to cause: This is not the correct > coordinator.. Rediscovery will be attempted. > (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:136) > [2024-01-17 21:34:59,882] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] Discovered group coordinator > Coordinator(key='consumer-groups-test-0', nodeId=3, > host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, > errorCode=0, errorMessage='') > (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162){code} > Some of the consumers don't consume any message. The logs show that after the > consumer starts up and successfully logs in, > # The consumer discovers the group coordinator. > # The heartbeat to join group fails because "This is not the correct > coordinator" > # The consumer rediscover the group coordinator. > Another heartbeat should follow the rediscovery of the group coordinator, but > there's no logs showing sign of a heartbeat request. > On the server side, there is completely no log about the group id. A > suspicion is that the consumer doesn't send a heartbeat request after > rediscover the group coordinator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations
[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15848: -- Priority: Blocker (was: Major) > Consumer API timeout inconsistent between ConsumerDelegate implementations > -- > > Key: KAFKA-15848 > URL: https://issues.apache.org/jira/browse/KAFKA-15848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and > {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their > use and interpretation of the {{Timer}} that is supplied. > h3. tl;dr > {{AsyncKafkaConsumer}} is very literal about the timeout, whereas > {{LegacyKafkaConsumer}} seems to give a little wiggle room. > {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for > success of its operations _before_ checking the timer: > # Submit operation asynchronously > # Wait for operation to complete using {{NetworkClient.poll()}} > # Check for result > ## If successful, return success > ## If fatal failure, return failure > # Check timer > ## If timer expired, return failure > {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: > # Submit operation asynchronously > # Wait for operation to complete using {{Future.get()}} > ## If operation timed out, {{Future.get()}} will throw a timeout error > # Check for result > ## If successful, return success > ## Otherwise, return failure > h3. How to reproduce > This causes subtle timing issues, but they can be easily reproduced via any > of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} > API. Here's a bit of code that illustrates the difference between the two > approaches. > {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a > manner similar to this: > {code:java} > public int getCount(Timer timer) { > do { > final RequestFuture future = sendSomeRequest(partitions); > client.poll(future, timer); > if (future.isDone()) > return future.get(); > } while (timer.notExpired()); > return -1; > } > {code} > {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: > {code:java} > private int getCount(Timer timer) { > try { > CompletableFuture future = new CompleteableFuture<>(); > applicationEventQueue.add(future); > return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); > } catch (TimeoutException e) { > return -1; > } > } > {code} > The call to {{add}} enqueues the network operation, but it then _immediately_ > invokes {{Future.get()}} with the timeout to implement a time-bounded > blocking call. Since this method is being called with a timeout of 0, it > _immediately_ throws a {{{}TimeoutException{}}}. > h3. Suggested fix > TBD :( -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios
[ https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16111: -- Priority: Blocker (was: Major) > Implement tests for tricky rebalance callback scenarios > --- > > Key: KAFKA-16111 > URL: https://issues.apache.org/jira/browse/KAFKA-16111 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Kirk True >Priority: Blocker > Labels: callback, consumer-threading-refactor > Fix For: 3.8.0 > > > There is justified concern that the new threading model may not play well > with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide > some assurance that it will support complicated patterns. > # Design and implement test scenarios > # Update and document any design changes with the callback sub-system where > needed > # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by > said design -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16165) Consumer invalid transition on expired poll interval
[ https://issues.apache.org/jira/browse/KAFKA-16165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16165: -- Priority: Blocker (was: Major) > Consumer invalid transition on expired poll interval > > > Key: KAFKA-16165 > URL: https://issues.apache.org/jira/browse/KAFKA-16165 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Blocker > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > Running system tests with the new async consumer revealed an invalid > transition related to the consumer not being polled on the interval in some > kind of scenario (maybe relates to consumer close, as the transition is > leaving->stale) > Log trace: > [2024-01-17 19:45:07,379] WARN [Consumer > clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, > groupId=consumer-groups-test-2] consumer poll timeout has expired. This means > the time between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2024-01-17 19:45:07,379] ERROR [Consumer > clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, > groupId=consumer-groups-test-2] Unexpected error caught in consumer network > thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91) > java.lang.IllegalStateException: Invalid state transition from LEAVING to > STALE > at > org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303) > at > org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739) > at > org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) > at > java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16186) Implement broker metrics for client telemetry usage
[ https://issues.apache.org/jira/browse/KAFKA-16186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-16186. - Fix Version/s: 3.8.0 Resolution: Fixed merged the PR to trunk > Implement broker metrics for client telemetry usage > --- > > Key: KAFKA-16186 > URL: https://issues.apache.org/jira/browse/KAFKA-16186 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > Fix For: 3.8.0 > > > The KIP-714 lists new metrics for broker which records the usage of client > telemetry instances and plugin. Implement broker metrics as defined in the > KIP-714. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15772) Flaky test TransactionsWithTieredStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-15772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17812485#comment-17812485 ] Apoorv Mittal commented on KAFKA-15772: --- Failure of test: `testAbortTransactionTimeout` in `TransactionsWithTieredStoreTest` class https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15251/7/tests {code:java} Errororg.apache.kafka.common.errors.TimeoutException: Timeout expired after 3000ms while awaiting InitProducerIdStacktraceorg.apache.kafka.common.errors.TimeoutException: Timeout expired after 3000ms while awaiting InitProducerIdStandard Output[2024-01-30 16:29:01,250] INFO [LocalTieredStorage Id=0] Creating directory: [/tmp/kafka-remote-tier-transactionswithtieredstoretest11967450412731752897/kafka-tiered-storage] (org.apache.kafka.server.log.remote.storage.LocalTieredStorage:289)[2024-01-30 16:29:01,250] INFO [LocalTieredStorage Id=0] Created local tiered storage manager [0]:[kafka-tiered-storage] (org.apache.kafka.server.log.remote.storage.LocalTieredStorage:301)[2024-01-30 16:29:01,251] INFO Started configuring topic-based RLMM with configs: {remote.log.metadata.topic.replication.factor=3, remote.log.metadata.topic.num.partitions=3, remote.log.metadata.common.client.bootstrap.servers=localhost:40061, broker.id=0, remote.log.metadata.initialization.retry.interval.ms=300, remote.log.metadata.common.client.security.protocol=PLAINTEXT, cluster.id=z_bOu1YoRbKNNIThjztsdA, log.dir=/tmp/kafka-6827936654389854503} (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager:358)[2024-01-30 16:29:01,251] INFO Successfully configured topic-based RLMM with config: TopicBasedRemoteLogMetadataManagerConfig{clientIdPrefix='__remote_log_metadata_client_0', metadataTopicPartitionsCount=3, consumeWaitMs=12, metadataTopicRetentionMs=-1, metadataTopicReplicationFactor=3, initializationRetryMaxTimeoutMs=12, initializationRetryIntervalMs=300, commonProps={security.protocol=PLAINTEXT, bootstrap.servers=localhost:40061}, consumerProps={security.protocol=PLAINTEXT, key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, enable.auto.commit=false, bootstrap.servers=localhost:40061, exclude.internal.topics=false, auto.offset.reset=earliest, client.id=__remote_log_metadata_client_0_consumer}, producerProps={security.protocol=PLAINTEXT, enable.idempotence=true, value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer, acks=all, bootstrap.servers=localhost:40061, key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer, client.id=__remote_log_metadata_client_0_producer}} (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager:364)[2024-01-30 16:29:01,252] INFO Initializing topic-based RLMM resources (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager:377)[2024-01-30 16:29:01,363] INFO Topic __remote_log_metadata does not exist. Error: This server does not host this topic-partition. (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager:466)[2024-01-30 16:29:01,366] ERROR Encountered error while creating __remote_log_metadata topic. (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager:528)java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.createTopic(TopicBasedRemoteLogMetadataManager.java:509) at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.initializeResources(TopicBasedRemoteLogMetadataManager.java:396) at java.base/java.lang.Thread.run(Thread.java:833)Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.[2024-01-30 16:29:01,366] INFO Sleep for 300 ms before it is retried again. (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager:401)[2024-01-30 16:29:01,538] WARN [LocalTieredStorage Id=1] Remote storage with ID [/tmp/kafka-remote-tier-transactionswithtieredstoretest11967450412731752897] already exists on the file system. Any data already in the remote storage will not be deleted and may result in an inconsistent state and/or provide stale data. (org.apache.kafka.server.log.remote
[jira] [Updated] (KAFKA-15553) Review consumer positions update
[ https://issues.apache.org/jira/browse/KAFKA-15553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15553: -- Labels: consumer-threading-refactor position (was: consumer-threading-refactor) > Review consumer positions update > > > Key: KAFKA-15553 > URL: https://issues.apache.org/jira/browse/KAFKA-15553 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, position > Fix For: 3.8.0 > > > From the existing comment: If there are any partitions which do not have a > valid position and are not awaiting reset, then we need to fetch committed > offsets. > In the async consumer: I wonder if it would make sense to refresh the > position on the event loop continuously. > The logic to refresh offsets in the poll loop is quite fragile and works > largely by side-effects of the code that it calls. For example, the behaviour > of the "cached" value is really not that straightforward and simply reading > the cached value is not sufficient to start consuming data in all cases. > This area needs a bit of a refactor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15652) Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp()
[ https://issues.apache.org/jira/browse/KAFKA-15652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15652: -- Labels: consumer-threading-refactor position (was: consumer-threading-refactor) > Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp() > > > Key: KAFKA-15652 > URL: https://issues.apache.org/jira/browse/KAFKA-15652 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, position > Fix For: 3.8.0 > > > In the {{updateFetchPositions()}} method implementation, both > {{KafkaConsumer}} and {{PrototypeAsyncConsumer}} reset positions > asynchronously. [~junrao] stated the following in a [recent PR > review|https://github.com/apache/kafka/pull/14406#discussion_r1349173413]: > {quote}There is a subtle difference between transitioning to reset from > initializing and transitioning to reset from {{OffsetOutOfRangeException}} > during fetch. In the latter, the application thread will call > {{{}FetchCollector.handleInitializeErrors(){}}}. If there is no default > offset reset policy, an {{OffsetOutOfRangeException}} will be thrown to the > application thread during {{{}poll{}}}, which is what we want. > However, for the former, if there is no default offset reset policy, we > simply ignore that partition through > {{{}OffsetFetcherUtils.getOffsetResetTimestamp{}}}. It seems in that case, > the partition will be forever in the reset state and the application thread > won't get the {{{}OffsetOutOfRangeException{}}}. > {quote} > I intentionally changed the code so that no exceptions were thrown in > {{OffsetFetcherUtils.getOffsetResetTimestamp()}} and would simply return an > empty map. When I ran the unit tests and integration tests, there were no > failures, strongly suggesting that there is no coverage of this particular > edge case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding
[ https://issues.apache.org/jira/browse/KAFKA-16023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16023: -- Labels: consumer-threading-refactor timeout (was: consumer-threading-refactor) > PlaintextConsumerTest needs to wait for reconciliation to complete before > proceeding > > > Key: KAFKA-16023 > URL: https://issues.apache.org/jira/browse/KAFKA-16023 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > Several tests in PlaintextConsumerTest.scala (such as > testPerPartitionLagMetricsCleanUpWithSubscribe) uses: > assertEquals(1, listener.callsToAssigned, "should be assigned once") > However, as the timing for reconciliation completion is not deterministic due > to asynchronous processing. We actually need to wait until the condition to > happen. > However, another issue is the timeout - some of these tasks might not > complete within the 600ms timeout, so the tests are deemed to be flaky. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invoke and verify the consumer callback
[ https://issues.apache.org/jira/browse/KAFKA-16019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16019: -- Component/s: system tests > Some of the tests in PlaintextConsumer can't seem to deterministically invoke > and verify the consumer callback > -- > > Key: KAFKA-16019 > URL: https://issues.apache.org/jira/browse/KAFKA-16019 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > I was running the PlaintextConsumer to test the async consumer; however, a > few tests were failing with not being able to verify the listener is invoked > correctly > For example `testPerPartitionLeadMetricsCleanUpWithSubscribe` > Around 50% of the time, the listener's callsToAssigned was never incremented > correctly. Event changing it to awaitUntilTrue it was still the same case > {code:java} > consumer.subscribe(List(topic, topic2).asJava, listener) > val records = awaitNonEmptyRecords(consumer, tp) > assertEquals(1, listener.callsToAssigned, "should be assigned once") {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15993) Enable max poll integration tests that depend on callback invocation
[ https://issues.apache.org/jira/browse/KAFKA-15993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15993: -- Component/s: system tests > Enable max poll integration tests that depend on callback invocation > > > Key: KAFKA-15993 > URL: https://issues.apache.org/jira/browse/KAFKA-15993 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Philip Nee >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-preview, timeout > Fix For: 3.8.0 > > > We will enable integration tests using the async consumer in KAFKA-15971. > However, we should also enable tests that rely on rebalance listeners after > KAFKA-15628 is closed. One example would be testMaxPollIntervalMs, that I > relies on the listener to verify the correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding
[ https://issues.apache.org/jira/browse/KAFKA-16023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16023: -- Component/s: system tests > PlaintextConsumerTest needs to wait for reconciliation to complete before > proceeding > > > Key: KAFKA-16023 > URL: https://issues.apache.org/jira/browse/KAFKA-16023 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > Several tests in PlaintextConsumerTest.scala (such as > testPerPartitionLagMetricsCleanUpWithSubscribe) uses: > assertEquals(1, listener.callsToAssigned, "should be assigned once") > However, as the timing for reconciliation completion is not deterministic due > to asynchronous processing. We actually need to wait until the condition to > happen. > However, another issue is the timeout - some of these tasks might not > complete within the 600ms timeout, so the tests are deemed to be flaky. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invoke and verify the consumer callback
[ https://issues.apache.org/jira/browse/KAFKA-16019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16019: -- Labels: consumer-threading-refactor timeout (was: consumer-threading-refactor) > Some of the tests in PlaintextConsumer can't seem to deterministically invoke > and verify the consumer callback > -- > > Key: KAFKA-16019 > URL: https://issues.apache.org/jira/browse/KAFKA-16019 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > I was running the PlaintextConsumer to test the async consumer; however, a > few tests were failing with not being able to verify the listener is invoked > correctly > For example `testPerPartitionLeadMetricsCleanUpWithSubscribe` > Around 50% of the time, the listener's callsToAssigned was never incremented > correctly. Event changing it to awaitUntilTrue it was still the same case > {code:java} > consumer.subscribe(List(topic, topic2).asJava, listener) > val records = awaitNonEmptyRecords(consumer, tp) > assertEquals(1, listener.callsToAssigned, "should be assigned once") {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15993) Enable max poll integration tests that depend on callback invocation
[ https://issues.apache.org/jira/browse/KAFKA-15993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15993: -- Labels: consumer-threading-refactor kip-848-preview timeout (was: consumer-threading-refactor kip-848-preview) > Enable max poll integration tests that depend on callback invocation > > > Key: KAFKA-15993 > URL: https://issues.apache.org/jira/browse/KAFKA-15993 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-preview, timeout > Fix For: 3.8.0 > > > We will enable integration tests using the async consumer in KAFKA-15971. > However, we should also enable tests that rely on rebalance listeners after > KAFKA-15628 is closed. One example would be testMaxPollIntervalMs, that I > relies on the listener to verify the correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15305: -- Labels: consumer-threading-refactor timeout (was: consumer-threading-refactor) > The background thread should try to process the remaining task until the > shutdown timer is expired > -- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. > > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16152) Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart
[ https://issues.apache.org/jira/browse/KAFKA-16152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16152: -- Component/s: system tests (was: unit tests) > Fix > PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart > -- > > Key: KAFKA-16152 > URL: https://issues.apache.org/jira/browse/KAFKA-16152 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16152) Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart
[ https://issues.apache.org/jira/browse/KAFKA-16152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16152: -- Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor kip-848-client-support system-tests) > Fix > PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart > -- > > Key: KAFKA-16152 > URL: https://issues.apache.org/jira/browse/KAFKA-16152 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16134: -- Component/s: system tests > kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer is flaky > -- > > Key: KAFKA-16134 > URL: https://issues.apache.org/jira/browse/KAFKA-16134 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Stanislav Kozlovski >Assignee: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The following test is very flaky. It failed 3 times consecutively in Jenkins > runs for the 3.7 release candidate. > kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16135) kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16135: -- Component/s: system tests > kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer is flaky > --- > > Key: KAFKA-16135 > URL: https://issues.apache.org/jira/browse/KAFKA-16135 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Stanislav Kozlovski >Assignee: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The test > kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer > is incredibly flaky - it failed 3 builds in a row for the 3.7 release > candidate, but with different JDK versions. Locally it also fails often and > requires a few retries to pass > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16151: -- Component/s: system tests (was: unit tests) > Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe > - > > Key: KAFKA-16151 > URL: https://issues.apache.org/jira/browse/KAFKA-16151 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16151: -- Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor kip-848-client-support system-tests) > Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe > - > > Key: KAFKA-16151 > URL: https://issues.apache.org/jira/browse/KAFKA-16151 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16150) Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16150: -- Component/s: system tests (was: unit tests) > Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe > > > Key: KAFKA-16150 > URL: https://issues.apache.org/jira/browse/KAFKA-16150 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15692) New integration tests to ensure full coverage
[ https://issues.apache.org/jira/browse/KAFKA-15692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15692: -- Component/s: system tests (was: unit tests) > New integration tests to ensure full coverage > - > > Key: KAFKA-15692 > URL: https://issues.apache.org/jira/browse/KAFKA-15692 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > These are to fix bugs discovered during PR reviews but not tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16110) Implement consumer performance tests
[ https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16110: -- Labels: consumer-threading-refactor performance-benchmark (was: consumer-threading-refactor) > Implement consumer performance tests > > > Key: KAFKA-16110 > URL: https://issues.apache.org/jira/browse/KAFKA-16110 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, performance-benchmark > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15561: -- Labels: kip-848-client-support regex (was: kip-848-client-support) > Client support for new SubscriptionPattern based subscription > - > > Key: KAFKA-15561 > URL: https://issues.apache.org/jira/browse/KAFKA-15561 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-client-support, regex > Fix For: 3.8.0 > > > New consumer should support subscribe with the new SubscriptionPattern > introduced in the new consumer group protocol. When subscribing with this > regex, the client should provide the regex in the HB request on the > SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15538: -- Labels: kip-848-client-support newbie regex (was: kip-848-client-support newbie) > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-client-support, newbie, regex > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. > As part of this task, we should re-enable all integration tests defined in > the PlainTextAsyncConsumer that relate to subscription with pattern and that > are currently disabled for the new consumer + new protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16011) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose
[ https://issues.apache.org/jira/browse/KAFKA-16011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16011: -- Component/s: system tests (was: unit tests) > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose > > > Key: KAFKA-16011 > URL: https://issues.apache.org/jira/browse/KAFKA-16011 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions HashSet(topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3). Instead, got ArrayBuffer(Set(topic1-0, topic-0, > topic-1), Set(), Set(topic1-1, topic1-5)) > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1865) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose(PlaintextConsumerTest.scala:1277) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:376) > [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Member JQ_e0S5FTzKnYyStB3aBrQ with epoch 0 transitioned to > FATAL state > (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:456) > [2023-12-13 15:33:33,212] ERROR [daemon-consumer-assignment]: Error due to > (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:139) > org.apache.kafka.common.errors.InvalidRequestException: RebalanceTimeoutMs > must be provided in first request. > [2023-12-13 15:33:39,196] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:33:39,200] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16011) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose
[ https://issues.apache.org/jira/browse/KAFKA-16011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16011: -- Labels: kip-848-client-support timeout (was: consumer-threading-refactor kip-848-client-support system-tests) > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose > > > Key: KAFKA-16011 > URL: https://issues.apache.org/jira/browse/KAFKA-16011 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-client-support, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions HashSet(topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3). Instead, got ArrayBuffer(Set(topic1-0, topic-0, > topic-1), Set(), Set(topic1-1, topic1-5)) > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1865) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose(PlaintextConsumerTest.scala:1277) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:376) > [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Member JQ_e0S5FTzKnYyStB3aBrQ with epoch 0 transitioned to > FATAL state > (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:456) > [2023-12-13 15:33:33,212] ERROR [daemon-consumer-assignment]: Error due to > (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:139) > org.apache.kafka.common.errors.InvalidRequestException: RebalanceTimeoutMs > must be provided in first request. > [2023-12-13 15:33:39,196] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:33:39,200] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16133) Commits during reconciliation always time out
[ https://issues.apache.org/jira/browse/KAFKA-16133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16133: -- Labels: consumer-threading-refactor reconciliation timeout (was: consumer-threading-refactor) > Commits during reconciliation always time out > - > > Key: KAFKA-16133 > URL: https://issues.apache.org/jira/browse/KAFKA-16133 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor, reconciliation, timeout > Fix For: 3.8.0 > > > This only affects the AsyncKafkaConsumer, which is in Preview in 3.7. > In MembershipManagerImpl there is a confusion between timeouts and deadlines. > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L836C38-L836C38] > This causes all autocommits during reconciliation to immediately time out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16133) Commits during reconciliation always time out
[ https://issues.apache.org/jira/browse/KAFKA-16133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16133: - Assignee: Lianet Magrans > Commits during reconciliation always time out > - > > Key: KAFKA-16133 > URL: https://issues.apache.org/jira/browse/KAFKA-16133 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lucas Brutschy >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, reconciliation, timeout > Fix For: 3.8.0 > > > This only affects the AsyncKafkaConsumer, which is in Preview in 3.7. > In MembershipManagerImpl there is a confusion between timeouts and deadlines. > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L836C38-L836C38] > This causes all autocommits during reconciliation to immediately time out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15847) Consider partial metadata requests for client reconciliation
[ https://issues.apache.org/jira/browse/KAFKA-15847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15847: -- Labels: consumer-threading-refactor reconciliation (was: consumer-threading-refactor) > Consider partial metadata requests for client reconciliation > > > Key: KAFKA-15847 > URL: https://issues.apache.org/jira/browse/KAFKA-15847 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, reconciliation > Fix For: 3.8.0 > > > New consumer implementing KIP-848 protocol needs to resolve metadata for the > topics received in the assignment. It does so by relying on the centralized > metadata object. Currently metadata updates requested through the metadata > object, request metadata for all topics. Consider allowing the partial > updates that are already expressed as an intention in the Metadata class but > not fully supported (investigate background in case there were some specifics > that led to this intention not being fully implemented) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16107) Ensure consumer does not start fetching from added partitions until onPartitionsAssigned completes
[ https://issues.apache.org/jira/browse/KAFKA-16107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16107: -- Labels: kip-848-client-support reconciliation (was: kip-848-client-support) > Ensure consumer does not start fetching from added partitions until > onPartitionsAssigned completes > -- > > Key: KAFKA-16107 > URL: https://issues.apache.org/jira/browse/KAFKA-16107 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, reconciliation > Fix For: 3.8.0 > > > In the new consumer implementation, when new partitions are assigned, the > subscription state is updated and then the #onPartitionsAssigned triggered. > This sequence seems sensible but we need to ensure that no data is fetched > until the onPartitionsAssigned completes (where the user could be setting the > committed offsets it want to start fetching from). > We should pause the partitions newly added partitions until > onPartitionsAssigned completes, similar to how it's done on revocation to > avoid positions getting ahead of the committed offsets. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15839) Topic ID integration in consumer subscription state
[ https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15839: -- Labels: kip-848-client-support reconciliation (was: kip-848-client-support) > Topic ID integration in consumer subscription state > --- > > Key: KAFKA-15839 > URL: https://issues.apache.org/jira/browse/KAFKA-15839 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, reconciliation > Fix For: 3.8.0 > > > With the new consumer group protocol, assignments received by the consumer > contain topic IDs instead of topic names. Topic Ids are used in the > reconciliation path, integrated using TopicIdPartition. When reconciling, > topic names are resolved via a metadata update, but they are also kept in a > local #MembershipManager cache. This local cache serves the purpose of > keeping assigned topicId-names (that might have been deleted from metadata, > ex. topic deleted). > That's just an initial step towards spreading topic IDs internally in the > consumer code. Next step to address with this task would be to include topic > IDs in the subscription state, so that assigned topicId-names can be accessed > from other components without the need of resolving names multiple times. > Note that this task aims only at spreading topic IDs internally in the > consumer, no changes to expose them at the API level. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15835) Group commit/callbacks triggering logic
[ https://issues.apache.org/jira/browse/KAFKA-15835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15835: -- Labels: kip-848-client-support reconciliation (was: kip-848-client-support) > Group commit/callbacks triggering logic > --- > > Key: KAFKA-15835 > URL: https://issues.apache.org/jira/browse/KAFKA-15835 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, reconciliation > Fix For: 3.8.0 > > > The new consumer reconciliation logic triggers a commit request, revocation > callback and assignment callbacks sequentially to ensure that they are > executed in that order. This means that we could require multiple iterations > of the poll loop to complete reconciling an assignment. > We could consider triggering them all together, to be executed in the same > poll iteration, while still making sure that they are executed in the right > order. Note that the sequence sometimes should not block on failures (ex. if > commit fails revocation proceeds anyways), and other times it does block (if > revocation callbacks fail onPartitionsAssigned is not called). > As part of this task, review the time boundaries for the commit request > issued when the assignment changes. It will be effectively time bounded by > the rebalance timeout enforced by the broker, so initial approach is to use > the same rebalance timeout as boundary on the client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15954) Review minimal effort approach on consumer last heartbeat on unsubscribe
[ https://issues.apache.org/jira/browse/KAFKA-15954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15954: -- Labels: kip-848-client-support reconciliation (was: kip-848-client-support) > Review minimal effort approach on consumer last heartbeat on unsubscribe > > > Key: KAFKA-15954 > URL: https://issues.apache.org/jira/browse/KAFKA-15954 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, reconciliation > Fix For: 3.8.0 > > > Currently the legacy and new consumer follows a minimal effort approach when > sending a leave group (legacy) or last heartbeat request (new consumer). The > request is sent without waiting/handling any response. This behaviour applies > when the consumer is being closed or when it unsubscribes. > For the case when the consumer is being closed, (which is a "terminal" > state), it makes sense to just follow a minimal effort approach for > "properly" leaving the group. But for the case of unsubscribe, it would maybe > make sense to put a little more effort in making sure that the last heartbeat > is sent and received by the broker. Note that unsubscribe could a temporary > state, where the consumer might want to re-join the group at any time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15832) Trigger client reconciliation based on manager poll
[ https://issues.apache.org/jira/browse/KAFKA-15832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15832: -- Labels: kip-848-client-support reconciliation (was: kip-848-client-support) > Trigger client reconciliation based on manager poll > --- > > Key: KAFKA-15832 > URL: https://issues.apache.org/jira/browse/KAFKA-15832 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Major > Labels: kip-848-client-support, reconciliation > Fix For: 3.8.0 > > > Currently the reconciliation logic on the client is triggered when a new > target assignment is received and resolved, or when new unresolved target > assignments are discovered in metadata. > This could be improved by triggering the reconciliation logic on each poll > iteration, to reconcile whatever is ready to be reconciled. This would > require changes to support poll on the MembershipManager, and integrate it > with the current polling logic in the background thread. Receiving a new > target assignment from the broker, or resolving new topic names via a > metadata update could only ensure that the #assignmentReadyToReconcile is > properly updated (currently done), but wouldn't trigger the #reconcile() > logic, leaving that to the #poll() operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions
[ https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15843: -- Labels: kip-848-client-support reconciliation (was: kip-848-client-support) > Review consumer onPartitionsAssigned called with empty partitions > - > > Key: KAFKA-15843 > URL: https://issues.apache.org/jira/browse/KAFKA-15843 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, reconciliation > Fix For: 3.8.0 > > > Legacy coordinator triggers onPartitionsAssigned with empty assignment (which > is not the case when triggering onPartitionsRevoked or Lost). This is the > behaviour of the legacy coordinator, and the new consumer implementation > maintains the same principle. We should review this to fully understand if it > is really needed to call onPartitionsAssigned with empty assignment (or if it > should behave consistently with the onRevoke/Lost). > Note that the consumer integration tests rely on this call to > onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions
[ https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15843: -- Labels: callback kip-848-client-support reconciliation (was: kip-848-client-support reconciliation) > Review consumer onPartitionsAssigned called with empty partitions > - > > Key: KAFKA-15843 > URL: https://issues.apache.org/jira/browse/KAFKA-15843 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: callback, kip-848-client-support, reconciliation > Fix For: 3.8.0 > > > Legacy coordinator triggers onPartitionsAssigned with empty assignment (which > is not the case when triggering onPartitionsRevoked or Lost). This is the > behaviour of the legacy coordinator, and the new consumer implementation > maintains the same principle. We should review this to fully understand if it > is really needed to call onPartitionsAssigned with empty assignment (or if it > should behave consistently with the onRevoke/Lost). > Note that the consumer integration tests rely on this call to > onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15588) Purge the unsent offset commits/fetches when the member is fenced/failed
[ https://issues.apache.org/jira/browse/KAFKA-15588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15588: -- Labels: kip-848-client-support reconciliation (was: kip-848-client-support) > Purge the unsent offset commits/fetches when the member is fenced/failed > > > Key: KAFKA-15588 > URL: https://issues.apache.org/jira/browse/KAFKA-15588 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, reconciliation > Fix For: 3.8.0 > > > When the member is fenced/failed, we should purge the inflight offset commits > and fetches. HeartbeatRequestManager should be able to handle this -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15321) Document consumer group member state machine
[ https://issues.apache.org/jira/browse/KAFKA-15321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15321: -- Labels: kip-848-client-support reconciliation (was: kip-848-client-support) > Document consumer group member state machine > > > Key: KAFKA-15321 > URL: https://issues.apache.org/jira/browse/KAFKA-15321 > Project: Kafka > Issue Type: Task > Components: clients, consumer, documentation >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Minor > Labels: kip-848-client-support, reconciliation > Fix For: 3.8.0 > > > We need to first document the new consumer group member state machine. What > are the different states and what are the transitions? > See [~pnee]'s notes: > [https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design] > *_Don’t forget to include diagrams for clarity!_* > This should be documented on the AK wiki. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16103) Review client logic for triggering offset commit callbacks
[ https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16103: -- Labels: kip-848-client-support offset (was: kip-848-client-support) > Review client logic for triggering offset commit callbacks > -- > > Key: KAFKA-16103 > URL: https://issues.apache.org/jira/browse/KAFKA-16103 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, offset > Fix For: 3.8.0 > > > Review logic for triggering commit callbacks, ensuring that all callbacks are > triggered before returning from commitSync -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16032) Review client errors thrown on OffsetFetch and OffsetCommit failures
[ https://issues.apache.org/jira/browse/KAFKA-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16032: -- Labels: kip-848-client-support offset (was: kip-848-client-support) > Review client errors thrown on OffsetFetch and OffsetCommit failures > > > Key: KAFKA-16032 > URL: https://issues.apache.org/jira/browse/KAFKA-16032 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, offset > Fix For: 3.8.0 > > > OffsetFetch and OffsetCommit handle errors separately. There are some issues > to review around this, to ensure that we keep propagating the same exceptions > as the legacy consumer: > 1. Ensure same behaviour of legacy coordinator for expected commit errors: > propagating the error.exception(), or CommitFailedException, depending on the > error (Ex. commit requests that fail with FENCED_INSTANCE_ID or > UNKNOWN_MEMBER_ID should throw a CommitFailedException instead of the error > specific ones) > 2. Ensure same behaviour of legacy coordinator for unexpected errors: all > unexpected should fail with KafkaException, even if retriable. > 3. Consider if we could unify the error handling for both, commit and fetch? > (Downside, even though there are common errors, there are some that are not > handled similarly in both requests (ex. UNKNOWN_TOPIC_OR_PARTITION throwing a > non-retriable KafkaException for fetch requests, but throwing a retriable > UnknownTopicOrPartitionException for commit requests). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16033) Review client retry logic of OffsetFetch and OffsetCommit responses
[ https://issues.apache.org/jira/browse/KAFKA-16033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16033: -- Labels: kip-848-client-support offset (was: kip-848-client-support) > Review client retry logic of OffsetFetch and OffsetCommit responses > --- > > Key: KAFKA-16033 > URL: https://issues.apache.org/jira/browse/KAFKA-16033 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, offset > Fix For: 3.8.0 > > > The retry logic for OffsetFetch and OffsetCommit requests lives in the > CommitRequestManager, and applies to requests issued from multiple components > (AsyncKakfaConsumer for commitSync and commitAsync, CommitRequestManager for > the regular auto-commits, MembershipManager for auto-commits before > rebalance, auto-commit before closing consumer). While this approach helps to > avoid having the retry logic in each caller, currently the CommitManager has > it in different places and it ends up being rather hard to follow. > This task aims at reviewing the retry logic from a high level perspective > (multiple callers, with retry needs that have similarities and differences at > the same time). So the review should asses the similarities vs differences, > and then consider two options: > 1. Keep retry logic centralized in the CommitManager, but fixed in a more > consistent way, applied the same way for all requests, depending on the > intention expressed by the caller. Advantages of this approach (current > approach + improvement) is that callers that require the same retry logic > could reuse if, keeping it in a single place (ex. commitSync from the > consumer retries in the same way as the auto-commit before rebalance). > 2. move retry logic to the caller. This aligns with the way it was done on > the legacy coordinator, but the main challenge seems to be not duplicating > the retry logic in callers that require the same. > This task will also review what exceptions are indeed retried on the > OffsetCommit and OffsetFetch, considering that the legacy implementation only > retries on some expected Retriable errors (not all) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID
[ https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15283: -- Labels: kip-848-client-support newbie offset (was: kip-848-client-support newbie) > Client support for OffsetFetch and OffsetCommit with topic ID > - > > Key: KAFKA-15283 > URL: https://issues.apache.org/jira/browse/KAFKA-15283 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Minor > Labels: kip-848-client-support, newbie, offset > Fix For: 3.8.0 > > > Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory > {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and > {{METADATA}} RPC calls. > With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in > the same way, so the new client implementation will provide it when issuing > those requests. Topic names should continue to be supported as needed by the > {{{}AdminClient{}}}. > We should also review/clean-up the support for topic names in requests such > as the {{METADATA}} request (currently supporting topic names as well as > topic IDs on the client side). > Tasks include: > * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will > be upgraded on the server to support topic ID > * Check topic ID propagation internally in the client based on RPCs > including it. > * Review existing support for topic name for potential clean if not needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16004) Review new consumer inflight offset commit logic
[ https://issues.apache.org/jira/browse/KAFKA-16004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16004: -- Labels: consumer-threading-refactor offset (was: consumer-threading-refactor kip-848-client-support) > Review new consumer inflight offset commit logic > > > Key: KAFKA-16004 > URL: https://issues.apache.org/jira/browse/KAFKA-16004 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, offset > Fix For: 3.8.0 > > > New consumer logic for committing offsets handles inflight requests, to > validate that no commit requests are sent if a previous one hasn't received a > response. Review how that logic is currently applied to both, sync and async > commits and validate against the legacy coordinator, who seems to apply it > only for async commits. Review considering behaviour for auto-commit > (considering the expected behaviour for auto-commit on the interval, > auto-commit on reconciliation, auto-commit on close) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15942) Implement ConsumerInterceptor
[ https://issues.apache.org/jira/browse/KAFKA-15942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15942: -- Labels: consumer-threading-refactor interceptors (was: consumer-threading-refactor) > Implement ConsumerInterceptor > - > > Key: KAFKA-15942 > URL: https://issues.apache.org/jira/browse/KAFKA-15942 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor, interceptors > Fix For: 3.8.0 > > > As title, we need to implement ConsumerInterceptor in the AsyncKafkaConsumer > > This is the current code. The implementation would be very similar > {code:java} > if (interceptors != null) > interceptors.onCommit(offsets); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15867) Should ConsumerNetworkThread wrap the exception and notify the polling thread?
[ https://issues.apache.org/jira/browse/KAFKA-15867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15867: -- Labels: consumer-threading-refactor events (was: consumer-threading-refactor) > Should ConsumerNetworkThread wrap the exception and notify the polling thread? > -- > > Key: KAFKA-15867 > URL: https://issues.apache.org/jira/browse/KAFKA-15867 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Minor > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > The ConsumerNetworkThread runs a tight loop infinitely. However, when > encountering an unexpected exception, it logs an error and continues. > > I think this might not be ideal because user can run blind for a long time > before discovering there's something wrong with the code; so I believe we > should propagate the throwable back to the polling thread. > > cc [~lucasbru] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls
[ https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15551: -- Labels: consumer-threading-refactor events (was: consumer-threading-refactor) > Evaluate conditions for short circuiting consumer API calls > --- > > Key: KAFKA-15551 > URL: https://issues.apache.org/jira/browse/KAFKA-15551 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > For conditions like: > * Committing empty offset > * Fetching offsets for empty partitions > * Getting empty topic partition position > Should be short circuit possibly at the API level. > As a bonus, we should double-check whether the existing {{KafkaConsumer}} > implementation suffers from this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15320) Document event queueing patterns
[ https://issues.apache.org/jira/browse/KAFKA-15320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15320: -- Labels: consumer-threading-refactor events (was: consumer-threading-refactor) > Document event queueing patterns > > > Key: KAFKA-15320 > URL: https://issues.apache.org/jira/browse/KAFKA-15320 > Project: Kafka > Issue Type: Task > Components: clients, consumer, documentation >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > We need to first document the event enqueuing patterns in the > PrototypeAsyncConsumer. As part of this task, determine if it’s > necessary/beneficial to _conditionally_ add events and/or coalesce any > duplicate events in the queue. > _Don’t forget to include diagrams for clarity!_ > This should be documented on the AK wiki. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15250) DefaultBackgroundThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15250: -- Labels: consumer-threading-refactor events (was: consumer-threading-refactor) > DefaultBackgroundThread is running tight loop > - > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15173) Consumer event queues should be bounded
[ https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15173: -- Labels: consumer-threading-refactor events (was: consumer-threading-refactor) > Consumer event queues should be bounded > --- > > Key: KAFKA-15173 > URL: https://issues.apache.org/jira/browse/KAFKA-15173 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > The async consumer uses ApplicationEventQueue and BackgroundEventQueue to > facilitate message passing between the application thread and the background > thread. The current implementation is boundless, which can potentially cause > OOM and other performance-related issues. > I think the queues need a finite bound, and we need to decide how to handle > the situation when the bound is reached. In particular, I would like to > answer these questions: > > # What should the upper limit be for both queues: Can this be a > configurable, memory-based bound? Or just an arbitrary number of events as > the bound. > # What should happen when the application event queue is filled up? It > seems like we should introduce a new exception type and notify the user that > the consumer is full. > # What should happen when the background event queue is filled up? This > seems less likely to happen, but I imagine it could happen when the user > stops polling the consumer, causing the queue to be filled. > # Is it necessary to introduce a public configuration for the queue? I think > initially we would select an arbitrary constant number and see the community > feedback to make a forward plan accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15635) Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric
[ https://issues.apache.org/jira/browse/KAFKA-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15635: -- Labels: consumer-threading-refactor fetcher (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric > - > > Key: KAFKA-15635 > URL: https://issues.apache.org/jira/browse/KAFKA-15635 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher > Fix For: 4.0.0 > > > Why is {{recordsFetchLeadMin}} different from {{partitionLead}} given there > is only 1 assigned partition? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15617) Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions and testInflightFetchOnPendingPartitions overlap
[ https://issues.apache.org/jira/browse/KAFKA-15617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15617: -- Labels: consumer-threading-refactor fetcher (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions > and testInflightFetchOnPendingPartitions overlap > -- > > Key: KAFKA-15617 > URL: https://issues.apache.org/jira/browse/KAFKA-15617 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher > Fix For: 4.0.0 > > > In FetcherTest, the two tests testFetchingPendingPartitions and > testInflightFetchOnPendingPartitions have significant overlap. Perhaps the > former subsumes the latter? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15634) Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15634: -- Labels: consumer-threading-refactor fetcher (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics > > > Key: KAFKA-15634 > URL: https://issues.apache.org/jira/browse/KAFKA-15634 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher > Fix For: 4.0.0 > > > What is the point of the code in the initial {{while}} loop since the receive > is delayed and thus there is no {{throttleDelayMs}} received in the client? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded
[ https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15637: -- Labels: consumer-threading-refactor fetcher (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's > testFetchCompletedBeforeHandlerAdded > > > Key: KAFKA-15637 > URL: https://issues.apache.org/jira/browse/KAFKA-15637 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher > Fix For: 4.0.0 > > > Thanks for the reply. I still don't quite understand the test. Why do we > duplicate the following code both inside and outside of {{{}setWakeupHook{}}}? > > {code:java} > networkClientDelegate.disconnectAsync(readReplica); > networkClientDelegate.poll(time.timer(0)); > {code} > > MockClient is only woken up through > {{{}networkClientDelegate.disconnectAsync{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Labels: consumer-threading-refactor fetcher (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval > - > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, fetcher > Fix For: 4.0.0 > > > As part of the review for [FetchRequestManager pull > request|https://github.com/apache/kafka/pull/14406], [~junrao] had some > questions related to the correctness and clarity of the > {{FetcherTest.testCompletedFetchRemoval()}} test: > Questions: > * https://github.com/apache/kafka/pull/14406#discussion_r1347908197 > * https://github.com/apache/kafka/pull/14406#discussion_r1347910980 > * https://github.com/apache/kafka/pull/14406#discussion_r1347913781 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14724) Port tests in FetcherTest to FetchRequestManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-14724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14724: -- Labels: consumer-threading-refactor fetcher kip-848-e2e kip-848-preview (was: consumer-threading-refactor kip-848-e2e kip-848-preview) > Port tests in FetcherTest to FetchRequestManagerTest > > > Key: KAFKA-14724 > URL: https://issues.apache.org/jira/browse/KAFKA-14724 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, fetcher, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task involves copying the relevant tests from {{FetcherTest}} and > modifying them to fit a new unit test named {{{}FetchRequestManagerTest{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId
[ https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15557: -- Labels: consumer-threading-refactor fetcher (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in > assignFromUserNoId > --- > > Key: KAFKA-15557 > URL: https://issues.apache.org/jira/browse/KAFKA-15557 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher > Fix For: 4.0.0 > > > The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods > named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to > perform duplicate metadata updates: > {code:java} > private void assignFromUser(Set partitions) { > subscriptions.assignFromUser(partitions); > client.updateMetadata(initialUpdateResponse); > // A dummy metadata update to ensure valid leader epoch. > metadata.updateWithCurrentRequestVersion( > RequestTestUtils.metadataUpdateWithIds( > "dummy", > 1, > Collections.emptyMap(), > singletonMap(topicName, 4), > tp -> validLeaderEpoch, topicIds > ), > false, > 0L > ); > } > {code} > {{client.updateMetadata()}} eventually calls > {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is > updating the cluster metadata twice with different values. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy
[ https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15641: -- Labels: fetcher (was: ) > Investigate CompletedFetch handleInitializeErrors for accuracy > -- > > Key: KAFKA-15641 > URL: https://issues.apache.org/jira/browse/KAFKA-15641 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: fetcher > > The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named > testFetchedRecordsAfterSeek, which [upon closer > inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] > may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}. > Here is the test code: > {code:java} > @Test > public void testFetchedRecordsAfterSeek() { > buildFetcher(OffsetResetStrategy.NONE, > new ByteArrayDeserializer(), > new ByteArrayDeserializer(), > 2, > IsolationLevel.READ_UNCOMMITTED); > assignFromUser(singleton(tp0)); > // Step 1: seek to offset 0 of our partition. > subscriptions.seek(tp0, 0); > // Step 2: issue a mock broker request to fetch data from the current > offset in our local state, > // i.e. offset 0. > assertTrue(sendFetches() > 0); > // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker. > client.prepareResponse(fullFetchResponse(tidp0, records, > Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); > // Step 4: process the network I/O to receive the response from the > broker with the OFFSET_OUT_OF_RANGE > // that was injected. Note, however, that we haven't "collected" the > fetch data included in the response. > networkClientDelegate.poll(time.timer(0)); > // Step 5: validate that the partition is not marked as needing its > offset reset. The response validation > // logic is performed during the fetch collection, which doesn't happen > until assertEmptyFetch below. > assertFalse(subscriptions.isOffsetResetNeeded(tp0)); > // Step 6: update the partition's position in our local state to offset > 2. We still haven't collected the > // fetch, so we haven't performed any validation of the fetch response. > subscriptions.seek(tp0, 2); > // Step 7: perform the fetch collection. As part of that process, error > handling is performed. Since > // we intentionally injected an error above, this error will be checked > and handled in the > // FetchCollector.handleInitializeErrors method. When handling > OFFSET_OUT_OF_RANGE, handleInitializeErrors > // will notice that the original requested offset (0) is different from > the state of our current offset (2). > assertEmptyFetch("Should not return records or advance position after > seeking to end of topic partition"); > } > {code} > Here is the code from {{FetchCollector.handleInitializeErrors}}: > {code:java} > private void handleInitializeErrors(final CompletedFetch completedFetch, > final Errors error) { > final TopicPartition tp = completedFetch.partition; > final long fetchOffset = completedFetch.nextFetchOffset(); > . . . > if (error == Errors.OFFSET_OUT_OF_RANGE) { > Optional clearedReplicaId = > subscriptions.clearPreferredReadReplica(tp); > if (!clearedReplicaId.isPresent()) { > // If there's no preferred replica to clear, we're fetching from > the leader so handle > // this error normally > SubscriptionState.FetchPosition position = > subscriptions.position(tp); > if (position == null || fetchOffset != position.offset) { > log.debug("Discarding stale fetch response for partition {} > since the fetched offset {} " + > "does not match the current offset {}", tp, > fetchOffset, position); > } else { > String errorMessage = "Fetch position " + position + " is out > of range for partition " + tp; > if (subscriptions.hasDefaultOffsetResetPolicy()) { > log.info("{}, resetting offset", errorMessage); > subscriptions.requestOffsetReset(tp); > } else { > log.info("{}, raising error to the application since no > reset policy is configured", > errorMessage); > throw new OffsetOutOfRangeException(errorMessage, > Collections.singletonMap(tp, position.offset)); > } > } > } else { > log.debug("Unset the preferred read replica {} for partition {} > since we got {} when fetching {}", > clearedReplicaI
[jira] [Updated] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15636: -- Labels: consumer-threading-refactor fetcher (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics > > > Key: KAFKA-15636 > URL: https://issues.apache.org/jira/browse/KAFKA-15636 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, fetcher > Fix For: 4.0.0 > > > {{expectedBytes}} is calculated as total, instead of avg. Is this correct? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15971) Re-enable consumer integration tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15971: -- Labels: consumer-threading-refactor kip-848 kip-848-preview system-tests (was: consumer-threading-refactor kip-848 kip-848-preview) > Re-enable consumer integration tests for new consumer > - > > Key: KAFKA-15971 > URL: https://issues.apache.org/jira/browse/KAFKA-15971 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: consumer-threading-refactor, kip-848, kip-848-preview, > system-tests > Fix For: 3.7.0 > > > Re-enable the consumer integration tests for the new consumer making sure > that build stability is not impacted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15932) Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")
[ https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15932: -- Labels: consumer-threading-refactor flaky-test kip-848 kip-848-client-support system-tests (was: consumer-threading-refactor flaky-test kip-848 kip-848-client-support) > Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer") > --- > > Key: KAFKA-15932 > URL: https://issues.apache.org/jira/browse/KAFKA-15932 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: consumer-threading-refactor, flaky-test, kip-848, > kip-848-client-support, system-tests > Fix For: 3.7.0 > > > Intermittently failing test for the new consumer. > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/ > ```Error > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > Stacktrace > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > at > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161) > at > app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128) > at > app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(Throw
[jira] [Updated] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16151: -- Labels: consumer-threading-refactor kip-848-client-support system-tests (was: consumer-threading-refactor kip-848-client-support) > Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe > - > > Key: KAFKA-16151 > URL: https://issues.apache.org/jira/browse/KAFKA-16151 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16150) Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16150: -- Labels: consumer-threading-refactor kip-848-client-support system-tests (was: consumer-threading-refactor kip-848-client-support) > Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe > > > Key: KAFKA-16150 > URL: https://issues.apache.org/jira/browse/KAFKA-16150 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16011) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose
[ https://issues.apache.org/jira/browse/KAFKA-16011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16011: -- Labels: consumer-threading-refactor kip-848-client-support system-tests (was: consumer-threading-refactor kip-848-client-support) > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose > > > Key: KAFKA-16011 > URL: https://issues.apache.org/jira/browse/KAFKA-16011 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions HashSet(topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3). Instead, got ArrayBuffer(Set(topic1-0, topic-0, > topic-1), Set(), Set(topic1-1, topic1-5)) > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1865) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnClose(PlaintextConsumerTest.scala:1277) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:376) > [2023-12-13 15:33:33,180] ERROR [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Member JQ_e0S5FTzKnYyStB3aBrQ with epoch 0 transitioned to > FATAL state > (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:456) > [2023-12-13 15:33:33,212] ERROR [daemon-consumer-assignment]: Error due to > (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:139) > org.apache.kafka.common.errors.InvalidRequestException: RebalanceTimeoutMs > must be provided in first request. > [2023-12-13 15:33:39,196] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:33:39,200] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15515) Remove duplicated integration tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15515: -- Labels: consumer-threading-refactor system-tests (was: consumer-threading-refactor) > Remove duplicated integration tests for new consumer > > > Key: KAFKA-15515 > URL: https://issues.apache.org/jira/browse/KAFKA-15515 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, system-tests > Fix For: 3.7.0 > > > This task involves removing the temporary `PlaintextAsyncConsumer` file > containing duplicated integration tests for the new consumer. The copy was > generated to catch regressions and validate functionality in the new consumer > while in development. It should be deleted when the new consumer is fully > implemented and the existing integration tests (`PlaintextConsumerTest`) can > be executed for both implementations. > > Context: > > For the current KafkaConsumer, a set of integration tests exist in the file > PlaintextConsumerTest. Those tests cannot be executed as such for the new > consumer implementation for 2 main reasons > - the new consumer is being developed as a new PrototypeAsyncConsumer class, > in parallel to the existing KafkaConsumer. > - the new consumer is under development, so it does not support all the > consumer functionality yet. > > In order to be able to run the subsets of tests that the new consumer > supports while the implementation completes, it was decided to : > - to make a copy of the `PlaintextAsyncConsumer` class, named > PlaintextAsyncConsumer. > - leave all the existing integration tests that cover the simple consumer > case unchanged, and disable the tests that are not yet supported by the new > consumer. Disabled tests will be enabled as the async consumer > evolves. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15986) New consumer group protocol integration test failures
[ https://issues.apache.org/jira/browse/KAFKA-15986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15986: -- Labels: consumer-threading-refactor kip-848 kip-848-client-support system-tests (was: consumer-threading-refactor kip-848 kip-848-client-support) > New consumer group protocol integration test failures > - > > Key: KAFKA-15986 > URL: https://issues.apache.org/jira/browse/KAFKA-15986 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support, system-tests > Fix For: 3.7.0 > > > A recent change in `AsyncKafkaConsumer.updateFetchPositions` has made > fetching fail without returning records in some situations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16152) Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart
[ https://issues.apache.org/jira/browse/KAFKA-16152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16152: -- Labels: consumer-threading-refactor kip-848-client-support system-tests (was: consumer-threading-refactor kip-848-client-support) > Fix > PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart > -- > > Key: KAFKA-16152 > URL: https://issues.apache.org/jira/browse/KAFKA-16152 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid
[ https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15991: -- Labels: consumer-threading-refactor flaky-test kip-848 kip-848-client-support system-tests (was: consumer-threading-refactor flaky-test kip-848 kip-848-client-support) > Flaky new consumer test testGroupIdNotNullAndValid > -- > > Key: KAFKA-15991 > URL: https://issues.apache.org/jira/browse/KAFKA-15991 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lianet Magrans >Assignee: Bruno Cadonna >Priority: Major > Labels: consumer-threading-refactor, flaky-test, kip-848, > kip-848-client-support, system-tests > Fix For: 3.7.0 > > > Fails locally when running it in a loop with it's latest changes from > [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.] > Failed the build so temporarily disabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
[ https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16167: -- Labels: consumer-threading-refactor kip-848-client-support system-tests (was: consumer-threading-refactor kip-848-client-support) > Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup > -- > > Key: KAFKA-16167 > URL: https://issues.apache.org/jira/browse/KAFKA-16167 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14830) Illegal state error in transactional producer
[ https://issues.apache.org/jira/browse/KAFKA-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14830: -- Component/s: clients producer > Illegal state error in transactional producer > - > > Key: KAFKA-14830 > URL: https://issues.apache.org/jira/browse/KAFKA-14830 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.1.2 >Reporter: Jason Gustafson >Assignee: Kirk True >Priority: Major > > We have seen the following illegal state error in the producer: > {code:java} > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > topic-0:120027 ms has passed since batch creation > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > topic-1:120026 ms has passed since batch creation > [Producer clientId=client-id2, transactionalId=transactional-id] Aborting > incomplete transaction > [Producer clientId=client-id2, transactionalId=transactional-id] Invoking > InitProducerId with current producer ID and epoch > ProducerIdAndEpoch(producerId=191799, epoch=0) in order to bump the epoch > [Producer clientId=client-id2, transactionalId=transactional-id] ProducerId > set to 191799 with epoch 1 > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.NetworkException: Disconnected from node 4 > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.TimeoutException: The request timed out. > [Producer clientId=client-id2, transactionalId=transactional-id] Uncaught > error in request completion: > java.lang.IllegalStateException: TransactionalId transactional-id: Invalid > transition attempted from state READY to state ABORTABLE_ERROR > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1089) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:508) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:734) > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:739) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:753) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > at java.base/java.lang.Thread.run(Thread.java:829) > {code} > The producer hits timeouts which cause it to abort an active transaction. > After aborting, the producer bumps its epoch, which transitions it back to > the `READY` state. Following this, there are two errors for inflight > requests, which cause an illegal state transition to `ABORTABLE_ERROR`. But > how could the transaction ABORT complete if there were still inflight > requests? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13703) OAUTHBEARER client will not use defined truststore
[ https://issues.apache.org/jira/browse/KAFKA-13703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-13703: -- Labels: oauth (was: ) > OAUTHBEARER client will not use defined truststore > -- > > Key: KAFKA-13703 > URL: https://issues.apache.org/jira/browse/KAFKA-13703 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.0 >Reporter: Adam Long >Assignee: Kirk True >Priority: Major > Labels: oauth > > I am developing a Kafka client that uses OAUTHBEARER and SSL to connect. I'm > attempting to test against a server using a key from a custom CA. I added > the trust-chain for the server to a Truststore JKS file, and referenced it in > the configuration. However, I continually get PKIX errors. After some code > tracing, I believe the OAUTHBEARER client code ignores defined truststores. > Here is an example based on my configuration: > {code:java} > application.id=my-kafka-client > client.id=my-kafka-client > group.id=my-kafka-client > # OAuth/SSL listener > bootstrap.servers=:9096 > security.protocol=SASL_SSL > # OAuth Configuration > sasl.mechanism=OAUTHBEARER > sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler > sasl.login.connect.timeout.ms=15000 > sasl.oauthbearer.token.endpoint.url=https:///auth/realms//protocol/openid-connect/token > ssl.truststore.location=\kafka.truststore.jks > #ssl.truststore.password=changeit > sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > required \ > clientId="my-kafka-client" \ > clientSecret="my-kafka-client-secret"; > {code} > Note - my Truststore does not have password (I tried setting it to see if > that would solve the problem initially). > I'm using the following example test code: > {code:java} > package example; > import java.io.IOException; > import java.net.URISyntaxException; > import java.util.Properties; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.common.serialization.StringDeserializer; > import org.apache.kafka.common.serialization.StringSerializer; > public class Main { >public static void main(final String[] args) throws IOException, > URISyntaxException { > Properties config = new Properties(); > > config.load(Main.class.getClassLoader().getResourceAsStream("client.conf")); > config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class); > config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class); > > final KafkaConsumer consumer = new > KafkaConsumer<>(config); >} > } > {code} > The issue seems to be in the > {{org.apache.kafka.common.security.oauthbearer.secured}} package - in > particular the {{AccessTokenRetrieverFactory.create()}} method, as it creates > an sslContext but does not include the configured truststore from the Kafka > configuration. > As such, it appears that unless you alter the JVM-default truststore, you > cannot connect to a server running a custom trust-chain. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13703) OAUTHBEARER client will not use defined truststore
[ https://issues.apache.org/jira/browse/KAFKA-13703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-13703: -- Component/s: clients > OAUTHBEARER client will not use defined truststore > -- > > Key: KAFKA-13703 > URL: https://issues.apache.org/jira/browse/KAFKA-13703 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.0 >Reporter: Adam Long >Assignee: Kirk True >Priority: Major > > I am developing a Kafka client that uses OAUTHBEARER and SSL to connect. I'm > attempting to test against a server using a key from a custom CA. I added > the trust-chain for the server to a Truststore JKS file, and referenced it in > the configuration. However, I continually get PKIX errors. After some code > tracing, I believe the OAUTHBEARER client code ignores defined truststores. > Here is an example based on my configuration: > {code:java} > application.id=my-kafka-client > client.id=my-kafka-client > group.id=my-kafka-client > # OAuth/SSL listener > bootstrap.servers=:9096 > security.protocol=SASL_SSL > # OAuth Configuration > sasl.mechanism=OAUTHBEARER > sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler > sasl.login.connect.timeout.ms=15000 > sasl.oauthbearer.token.endpoint.url=https:///auth/realms//protocol/openid-connect/token > ssl.truststore.location=\kafka.truststore.jks > #ssl.truststore.password=changeit > sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > required \ > clientId="my-kafka-client" \ > clientSecret="my-kafka-client-secret"; > {code} > Note - my Truststore does not have password (I tried setting it to see if > that would solve the problem initially). > I'm using the following example test code: > {code:java} > package example; > import java.io.IOException; > import java.net.URISyntaxException; > import java.util.Properties; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.common.serialization.StringDeserializer; > import org.apache.kafka.common.serialization.StringSerializer; > public class Main { >public static void main(final String[] args) throws IOException, > URISyntaxException { > Properties config = new Properties(); > > config.load(Main.class.getClassLoader().getResourceAsStream("client.conf")); > config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class); > config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class); > > final KafkaConsumer consumer = new > KafkaConsumer<>(config); >} > } > {code} > The issue seems to be in the > {{org.apache.kafka.common.security.oauthbearer.secured}} package - in > particular the {{AccessTokenRetrieverFactory.create()}} method, as it creates > an sslContext but does not include the configured truststore from the Kafka > configuration. > As such, it appears that unless you alter the JVM-default truststore, you > cannot connect to a server running a custom trust-chain. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-10228) producer: NETWORK_EXCEPTION is thrown instead of a request timeout
[ https://issues.apache.org/jira/browse/KAFKA-10228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-10228: -- Component/s: producer > producer: NETWORK_EXCEPTION is thrown instead of a request timeout > -- > > Key: KAFKA-10228 > URL: https://issues.apache.org/jira/browse/KAFKA-10228 > Project: Kafka > Issue Type: Improvement > Components: clients, producer >Affects Versions: 2.3.1 >Reporter: Christian Becker >Assignee: Kirk True >Priority: Major > > We're currently seeing an issue with the java client (producer), when message > producing runs into a timeout. Namely a NETWORK_EXCEPTION is thrown instead > of a timeout exception. > *Situation and relevant code:* > Config > {code:java} > request.timeout.ms: 200 > retries: 3 > acks: all{code} > {code:java} > for (UnpublishedEvent event : unpublishedEvents) { > ListenableFuture> future; > future = kafkaTemplate.send(new ProducerRecord<>(event.getTopic(), > event.getKafkaKey(), event.getPayload())); > futures.add(future.completable()); > } > CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).join();{code} > We're using the KafkaTemplate from SpringBoot here, but it shouldn't matter, > as it's merely a wrapper. There we put in batches of messages to be sent. > 200ms later, we can see the following in the logs: (not sure about the order, > they've arrived in the same ms, so our logging system might not display them > in the right order) > {code:java} > [Producer clientId=producer-1] Received invalid metadata error in produce > request on partition events-6 due to > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received.. Going to request metadata update now > [Producer clientId=producer-1] Got error produce response with correlation id > 3094 on topic-partition events-6, retrying (2 attempts left). Error: > NETWORK_EXCEPTION {code} > There is also a corresponding error on the broker (within a few ms): > {code:java} > Attempting to send response via channel for which there is no open > connection, connection id XXX (kafka.network.Processor) {code} > This was somewhat unexpected and sent us for a hunt across the infrastructure > for possible connection issues, but we've found none. > Side note: In some cases the retries worked and the messages were > successfully produced. > Only after many hours of heavy debugging, we've noticed, that the error might > be related to the low timeout setting. We've removed that setting now, as it > was a remnant from the past and no longer valid for our use-case. However in > order to avoid other people having that issue again and to simplify future > debugging, some form of timeout exception should be thrown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations
[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15848: -- Component/s: system tests (was: unit tests) > Consumer API timeout inconsistent between ConsumerDelegate implementations > -- > > Key: KAFKA-15848 > URL: https://issues.apache.org/jira/browse/KAFKA-15848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and > {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their > use and interpretation of the {{Timer}} that is supplied. > h3. tl;dr > {{AsyncKafkaConsumer}} is very literal about the timeout, whereas > {{LegacyKafkaConsumer}} seems to give a little wiggle room. > {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for > success of its operations _before_ checking the timer: > # Submit operation asynchronously > # Wait for operation to complete using {{NetworkClient.poll()}} > # Check for result > ## If successful, return success > ## If fatal failure, return failure > # Check timer > ## If timer expired, return failure > {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: > # Submit operation asynchronously > # Wait for operation to complete using {{Future.get()}} > ## If operation timed out, {{Future.get()}} will throw a timeout error > # Check for result > ## If successful, return success > ## Otherwise, return failure > h3. How to reproduce > This causes subtle timing issues, but they can be easily reproduced via any > of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} > API. Here's a bit of code that illustrates the difference between the two > approaches. > {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a > manner similar to this: > {code:java} > public int getCount(Timer timer) { > do { > final RequestFuture future = sendSomeRequest(partitions); > client.poll(future, timer); > if (future.isDone()) > return future.get(); > } while (timer.notExpired()); > return -1; > } > {code} > {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: > {code:java} > private int getCount(Timer timer) { > try { > CompletableFuture future = new CompleteableFuture<>(); > applicationEventQueue.add(future); > return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); > } catch (TimeoutException e) { > return -1; > } > } > {code} > The call to {{add}} enqueues the network operation, but it then _immediately_ > invokes {{Future.get()}} with the timeout to implement a time-bounded > blocking call. Since this method is being called with a timeout of 0, it > _immediately_ throws a {{{}TimeoutException{}}}. > h3. Suggested fix > TBD :( -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
[ https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16010: -- Labels: consumer-threading-refactor timeout (was: consumer-threading-refactor kip-848-client-support timeout) > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling > -- > > Key: KAFKA-16010 > URL: https://issues.apache.org/jira/browse/KAFKA-16010 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3] after one consumer left > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16104: -- Component/s: system tests > Enable additional PlaintextConsumerTest tests for new consumer > -- > > Key: KAFKA-16104 > URL: https://issues.apache.org/jira/browse/KAFKA-16104 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, system tests >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > It should be possible to enable: > * testAutoCommitOnClose > * testAutoCommitOnCloseAfterWakeup > * testExpandingTopicSubscriptions > * testShrinkingTopicSubscriptions > * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed) > * testMultiConsumerSessionTimeoutOnStopPolling > * testAutoCommitOnRebalance > * testPerPartitionLeadMetricsCleanUpWithSubscribe > * testPerPartitionLagMetricsCleanUpWithSubscribe > * testStaticConsumerDetectsNewPartitionCreatedAfterRestart -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
[ https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16009: -- Component/s: system tests (was: unit tests) > Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation > > > Key: KAFKA-16009 > URL: https://issues.apache.org/jira/browse/KAFKA-16009 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235) > {code} > The logs include this line: > > {code} > [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
[ https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16009: -- Labels: consumer-threading-refactor timeout (was: consumer-threading-refactor kip-848-client-support timeout) > Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation > > > Key: KAFKA-16009 > URL: https://issues.apache.org/jira/browse/KAFKA-16009 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235) > {code} > The logs include this line: > > {code} > [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16008: -- Component/s: system tests (was: unit tests) > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
[ https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16010: -- Component/s: system tests (was: unit tests) > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling > -- > > Key: KAFKA-16010 > URL: https://issues.apache.org/jira/browse/KAFKA-16010 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3] after one consumer left > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)
[ https://issues.apache.org/jira/browse/KAFKA-15908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15908: -- Labels: consumer-threading-refactor timeout (was: ) > Remove deprecated Consumer API poll(long timeout) > - > > Key: KAFKA-15908 > URL: https://issues.apache.org/jira/browse/KAFKA-15908 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 4.0.0 > > > Per > [KIP-266|https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior], > the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. > In 3.7, there are two implementations, each with different behavior: > * The {{LegacyKafkaConsumer}} implementation will continue to work but will > log a warning about its removal > * The {{AsyncKafkaConsumer}} implementation will throw an error. > In 4.0, the `poll` method that takes a single `long` timeout will be removed > altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16142) Update metrics documentation for errors and new metrics
[ https://issues.apache.org/jira/browse/KAFKA-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16142: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-848-client-support) > Update metrics documentation for errors and new metrics > --- > > Key: KAFKA-16142 > URL: https://issues.apache.org/jira/browse/KAFKA-16142 > Project: Kafka > Issue Type: Task > Components: clients, consumer, documentation, metrics >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16142) Update metrics documentation for errors and new metrics
[ https://issues.apache.org/jira/browse/KAFKA-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16142: -- Component/s: documentation > Update metrics documentation for errors and new metrics > --- > > Key: KAFKA-16142 > URL: https://issues.apache.org/jira/browse/KAFKA-16142 > Project: Kafka > Issue Type: Task > Components: clients, consumer, documentation, metrics >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16142) Update metrics documentation for errors and new metrics
[ https://issues.apache.org/jira/browse/KAFKA-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16142: -- Component/s: metrics > Update metrics documentation for errors and new metrics > --- > > Key: KAFKA-16142 > URL: https://issues.apache.org/jira/browse/KAFKA-16142 > Project: Kafka > Issue Type: Task > Components: clients, consumer, metrics >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16112) Review JMX metrics in Async Consumer and determine the missing ones
[ https://issues.apache.org/jira/browse/KAFKA-16112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16112: -- Component/s: metrics > Review JMX metrics in Async Consumer and determine the missing ones > --- > > Key: KAFKA-16112 > URL: https://issues.apache.org/jira/browse/KAFKA-16112 > Project: Kafka > Issue Type: Task > Components: clients, consumer, metrics >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16112) Review JMX metrics in Async Consumer and determine the missing ones
[ https://issues.apache.org/jira/browse/KAFKA-16112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16112: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-848-client-support) > Review JMX metrics in Async Consumer and determine the missing ones > --- > > Key: KAFKA-16112 > URL: https://issues.apache.org/jira/browse/KAFKA-16112 > Project: Kafka > Issue Type: Task > Components: clients, consumer, metrics >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16143) New metrics for KIP-848 protocol
[ https://issues.apache.org/jira/browse/KAFKA-16143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16143: -- Labels: kip-848-client-support (was: consumer-threading-refactor kip-848-client-support) > New metrics for KIP-848 protocol > > > Key: KAFKA-16143 > URL: https://issues.apache.org/jira/browse/KAFKA-16143 > Project: Kafka > Issue Type: Task > Components: clients, consumer, metrics >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15561: -- Labels: kip-848-client-support (was: consumer-threading-refactor kip-848-client-support) > Client support for new SubscriptionPattern based subscription > - > > Key: KAFKA-15561 > URL: https://issues.apache.org/jira/browse/KAFKA-15561 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > New consumer should support subscribe with the new SubscriptionPattern > introduced in the new consumer group protocol. When subscribing with this > regex, the client should provide the regex in the HB request on the > SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16143) New metrics for KIP-848 protocol
[ https://issues.apache.org/jira/browse/KAFKA-16143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16143: -- Component/s: metrics > New metrics for KIP-848 protocol > > > Key: KAFKA-16143 > URL: https://issues.apache.org/jira/browse/KAFKA-16143 > Project: Kafka > Issue Type: Task > Components: clients, consumer, metrics >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios
[ https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16111: -- Labels: callback consumer-threading-refactor (was: callback consumer-threading-refactor kip-848-client-support) > Implement tests for tricky rebalance callback scenarios > --- > > Key: KAFKA-16111 > URL: https://issues.apache.org/jira/browse/KAFKA-16111 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: callback, consumer-threading-refactor > Fix For: 3.8.0 > > > There is justified concern that the new threading model may not play well > with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide > some assurance that it will support complicated patterns. > # Design and implement test scenarios > # Update and document any design changes with the callback sub-system where > needed > # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by > said design -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16194) KafkaConsumer.groupMetadata() should be correct when first records are returned
[ https://issues.apache.org/jira/browse/KAFKA-16194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16194: -- Labels: kip-848-client-support (was: consumer-threading-refactor kip-848-client-support) > KafkaConsumer.groupMetadata() should be correct when first records are > returned > --- > > Key: KAFKA-16194 > URL: https://issues.apache.org/jira/browse/KAFKA-16194 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: David Jacot >Assignee: Bruno Cadonna >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > The following code returns records before the group metadata is updated. This > fails the first transactions ever run by the Producer/Consumer. > > {code:java} > Producer txnProducer = new KafkaProducer<>(txnProducerProps); > Consumer consumer = new KafkaConsumer<>(consumerProps); > txnProducer.initTransactions(); > System.out.println("Init transactions called"); > try { > txnProducer.beginTransaction(); > System.out.println("Begin transactions called"); > consumer.subscribe(Collections.singletonList("input")); > System.out.println("Consumer subscribed to topic -> KIP848-topic-2 "); > ConsumerRecords records = > consumer.poll(Duration.ofSeconds(10)); > System.out.println("Returned " + records.count() + " records."); > // Process and send txn messages. > for (ConsumerRecord processedRecord : records) { > txnProducer.send(new ProducerRecord<>("output", > processedRecord.key(), "Processed: " + processedRecord.value())); > } > ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); > System.out.println("Group metadata inside test" + groupMetadata); > Map offsetsToCommit = new HashMap<>(); > for (ConsumerRecord record : records) { > offsetsToCommit.put(new TopicPartition(record.topic(), > record.partition()), > new OffsetAndMetadata(record.offset() + 1)); > } > System.out.println("Offsets to commit" + offsetsToCommit); > // Send offsets to transaction with ConsumerGroupMetadata. > txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata); > System.out.println("Send offsets to transaction done"); > // Commit the transaction. > txnProducer.commitTransaction(); > System.out.println("Commit transaction done"); > } catch (ProducerFencedException | OutOfOrderSequenceException | > AuthorizationException e) { > e.printStackTrace(); > txnProducer.close(); > } catch (KafkaException e) { > e.printStackTrace(); > txnProducer.abortTransaction(); > } finally { > txnProducer.close(); > consumer.close(); > } {code} > The issue seems to be that while it waits in `poll`, the event to update the > group metadata is not processed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15974) Enforce that events and requests respect user-provided timeout
[ https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15974: -- Labels: consumer-threading-refactor timeout (was: consumer-threading-refactor kip-848-client-support timeout) > Enforce that events and requests respect user-provided timeout > -- > > Key: KAFKA-15974 > URL: https://issues.apache.org/jira/browse/KAFKA-15974 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to > block waiting for the event to complete. The application thread will block > for the timeout, but there is not yet a consistent manner in which events are > timed out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16203) AutoCommit of empty offsets blocks following requests due to inflight flag
[ https://issues.apache.org/jira/browse/KAFKA-16203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16203: -- Labels: kip-848-client-support (was: kip-848 kip-848-client-support) > AutoCommit of empty offsets blocks following requests due to inflight flag > -- > > Key: KAFKA-16203 > URL: https://issues.apache.org/jira/browse/KAFKA-16203 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > The logic for auto-committing offsets completes without generating a request, > but mistakenly leaves the inflight request flag on when auto-committing empty > offsets. This makes that following auto-commits won't generate requests, even > if offsets have been consumed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15954) Review minimal effort approach on consumer last heartbeat on unsubscribe
[ https://issues.apache.org/jira/browse/KAFKA-15954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15954: -- Labels: kip-848-client-support (was: kip-848 kip-848-client-support) > Review minimal effort approach on consumer last heartbeat on unsubscribe > > > Key: KAFKA-15954 > URL: https://issues.apache.org/jira/browse/KAFKA-15954 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > Currently the legacy and new consumer follows a minimal effort approach when > sending a leave group (legacy) or last heartbeat request (new consumer). The > request is sent without waiting/handling any response. This behaviour applies > when the consumer is being closed or when it unsubscribes. > For the case when the consumer is being closed, (which is a "terminal" > state), it makes sense to just follow a minimal effort approach for > "properly" leaving the group. But for the case of unsubscribe, it would maybe > make sense to put a little more effort in making sure that the last heartbeat > is sent and received by the broker. Note that unsubscribe could a temporary > state, where the consumer might want to re-join the group at any time. -- This message was sent by Atlassian Jira (v8.20.10#820010)