Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]
nizhikov commented on PR #13247: URL: https://github.com/apache/kafka/pull/13247#issuecomment-1772180903 CI looks OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15658) Zookeeper 3.6.3 jar | CVE-2023-44981
masood created KAFKA-15658: -- Summary: Zookeeper 3.6.3 jar | CVE-2023-44981 Key: KAFKA-15658 URL: https://issues.apache.org/jira/browse/KAFKA-15658 Project: Kafka Issue Type: Bug Reporter: masood The [CVE-2023-44981|https://www.mend.io/vulnerability-database/CVE-2023-44981] vulnerability has been reported in the zookeeper.jar. It's worth noting that the latest version of Kafka has a dependency on version 3.8.2 of Zookeeper, which is also impacted by this vulnerability. [https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.8.2|https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.8.2.] could you please verify its impact on the Kafka. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15591) Trogdor produce workload reports errors in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-15591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1578#comment-1578 ] Xi Yang edited comment on KAFKA-15591 at 10/20/23 6:01 AM: --- I print out the topic description after creating the topic. It looks like the partitions are correctly elected before Trogdor starts producing messages. However, the producer still reports the NOT_LEADER_OR_FOLLOWER error. Topic desc:(name=foo1, internal=false, partitions=(partition=0, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=1, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=2, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=3, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=4, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=5, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=6, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=7, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=8, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=9, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)), authorizedOperations=null) Create topics:[foo1-9, foo1-8, foo1-7, foo1-6, foo1-5, foo1-4, foo1-3, foo1-2, foo1-1, foo1-0] [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce response with correlation id 4 on topic-partition foo1-7, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition foo1-7 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce response with correlation id 4 on topic-partition foo1-6, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition foo1-6 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce response with correlation id 4 on topic-partition foo1-5, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition foo1-5 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce response with correlation id 4 on topic-partition foo1-4, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition foo1-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broke
[jira] [Commented] (KAFKA-15591) Trogdor produce workload reports errors in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-15591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1578#comment-1578 ] Xi Yang commented on KAFKA-15591: - I print out the topic description after creating the topic. It looks like the partitions are correctly elected before Trogdor starts producing messages. However, the producer still reports the NOT_LEADER_OR_FOLLOWER error. ```Topic desc:(name=foo1, internal=false, partitions=(partition=0, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=1, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=2, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=3, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=4, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=5, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=6, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=7, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=8, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)),(partition=9, leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: null), isr=localhost:9092 (id: 1 rack: null)), authorizedOperations=null) Create topics:[foo1-9, foo1-8, foo1-7, foo1-6, foo1-5, foo1-4, foo1-3, foo1-2, foo1-1, foo1-0] [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce response with correlation id 4 on topic-partition foo1-7, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition foo1-7 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce response with correlation id 4 on topic-partition foo1-6, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition foo1-6 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce response with correlation id 4 on topic-partition foo1-5, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition foo1-5 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce response with correlation id 4 on topic-partition foo1-4, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender) [2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition foo1-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests inten
Re: [PR] KAFKA-15632: Drop the invalid remote log metadata events [kafka]
kamalcph commented on code in PR #14576: URL: https://github.com/apache/kafka/pull/14576#discussion_r1364317572 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java: ## @@ -302,22 +307,26 @@ public void addCopyInProgressSegment(RemoteLogSegmentMetadata remoteLogSegmentMe RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId(); RemoteLogSegmentMetadata existingMetadata = idToSegmentMetadata.get(remoteLogSegmentId); -checkStateTransition(existingMetadata != null ? existingMetadata.state() : null, -remoteLogSegmentMetadata.state()); - +boolean isValid = checkStateTransition(existingMetadata != null ? existingMetadata.state() : null, +remoteLogSegmentMetadata.state(), remoteLogSegmentMetadata.remoteLogSegmentId()); +if (!isValid) { +return; +} for (Integer epoch : remoteLogSegmentMetadata.segmentLeaderEpochs().keySet()) { leaderEpochEntries.computeIfAbsent(epoch, leaderEpoch -> new RemoteLogLeaderEpochState()) .handleSegmentWithCopySegmentStartedState(remoteLogSegmentId); } - idToSegmentMetadata.put(remoteLogSegmentId, remoteLogSegmentMetadata); } -private void checkStateTransition(RemoteLogSegmentState existingState, RemoteLogSegmentState targetState) { -if (!RemoteLogSegmentState.isValidTransition(existingState, targetState)) { -throw new IllegalStateException( -"Current state: " + existingState + " can not be transitioned to target state: " + targetState); +private boolean checkStateTransition(RemoteLogSegmentState existingState, + RemoteLogSegmentState targetState, + RemoteLogSegmentId segmentId) { +boolean isValid = RemoteLogSegmentState.isValidTransition(existingState, targetState); +if (!isValid) { +log.error("Current state: {} can not be transitioned to target state: {}, segmentId: {}. Dropping the event", Review Comment: Logging the error instead of throwing the exception as it will stop the internal consumer which consumes from the remote log metadata topic. To clarify, producer `enable.idempotence` is set to true by default from v3.2. In our internal cluster, producer idempotence was not enabled and we have seen the out-of-order messages in the internal topic. Once this issue happens, the internal consumer stops processing the message, then fails to upload the pending segments to remote storage. This issue is not recoverable even after broker restarts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6
[ https://issues.apache.org/jira/browse/KAFKA-15657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1567#comment-1567 ] Ismael Juma commented on KAFKA-15657: - I was wondering the same. We should fix KAFKA-15653 and see if it's the source of the issues you have been seeing. I am not aware of any other change that would result in that sort of problem. > Unexpected errors when producing transactionally in 3.6 > --- > > Key: KAFKA-15657 > URL: https://issues.apache.org/jira/browse/KAFKA-15657 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 >Reporter: Travis Bischel >Priority: Major > > In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD > (which I created a separate issue for), and INVALID_TXN_STATE and > UNKNOWN_SERVER_ERROR. > INVALID_TXN_STATE is being returned even though the partitions have been > added to the transaction (AddPartitionsToTxn). Nothing about the code has > changed between 3.5 and 3.6, and I have loop-integration-tested this code > against 3.5 thousands of times. 3.6 is newly - and always - returning > INVALID_TXN_STATE. If I change the code to retry on INVALID_TXN_STATE, I > eventually quickly (always) receive UNKNOWN_SERVER_ERROR. In looking at the > broker logs, the broker indicates that sequence numbers are out of order - > but (a) I am repeating requests that were in order (so something on the > broker got a little haywire maybe? or maybe this is due to me ignoring > invalid_txn_state?), _and_ I am not receiving OUT_OF_ORDER_SEQUENCE_NUMBER, I > am receiving UNKNOWN_SERVER_ERROR. > I think the main problem is the client unexpectedly receiving > INVALID_TXN_STATE, but a second problem here is that OOOSN is being mapped to > USE on return for some reason. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6
[ https://issues.apache.org/jira/browse/KAFKA-15657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1565#comment-1565 ] Travis Bischel commented on KAFKA-15657: I'm beginning to suspect that KAFKA-15653 may eventually lead to this, I never experience this bug without first experiencing the NPEs while appending. I'll wait until 15653 is addressed and loop-test seeing if this still occurs. > Unexpected errors when producing transactionally in 3.6 > --- > > Key: KAFKA-15657 > URL: https://issues.apache.org/jira/browse/KAFKA-15657 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 >Reporter: Travis Bischel >Priority: Major > > In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD > (which I created a separate issue for), and INVALID_TXN_STATE and > UNKNOWN_SERVER_ERROR. > INVALID_TXN_STATE is being returned even though the partitions have been > added to the transaction (AddPartitionsToTxn). Nothing about the code has > changed between 3.5 and 3.6, and I have loop-integration-tested this code > against 3.5 thousands of times. 3.6 is newly - and always - returning > INVALID_TXN_STATE. If I change the code to retry on INVALID_TXN_STATE, I > eventually quickly (always) receive UNKNOWN_SERVER_ERROR. In looking at the > broker logs, the broker indicates that sequence numbers are out of order - > but (a) I am repeating requests that were in order (so something on the > broker got a little haywire maybe? or maybe this is due to me ignoring > invalid_txn_state?), _and_ I am not receiving OUT_OF_ORDER_SEQUENCE_NUMBER, I > am receiving UNKNOWN_SERVER_ERROR. > I think the main problem is the client unexpectedly receiving > INVALID_TXN_STATE, but a second problem here is that OOOSN is being mapped to > USE on return for some reason. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hanyu Zheng updated KAFKA-15629: Fix Version/s: 3.7.0 > proposal to introduce IQv2 Query Types: TimestampedKeyQuery and > TimestampedRangeQuery > - > > Key: KAFKA-15629 > URL: https://issues.apache.org/jira/browse/KAFKA-15629 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > Labels: kip > Fix For: 3.7.0 > > > KIP-992: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery > In the current IQv2 code, there are noticeable differences when interfacing > with plain-kv-store and ts-kv-store. Notably, the return type V acts as a > simple value for plain-kv-store but evolves into ValueAndTimestamp for > ts-kv-store, which presents type safety issues in the API. > Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring > compatibility concerns. > This brings us to the essence of our proposal: the introduction of distinct > query types. One that returns a plain value, another for values accompanied > by timestamps. > While querying a ts-kv-store for a plain value and then extracting it is > feasible, it doesn't make sense to query a plain-kv-store for a > ValueAndTimestamp. > Our vision is for plain-kv-store to always return V, while ts-kv-store should > return ValueAndTimestamp. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2
[ https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hanyu Zheng updated KAFKA-15527: Fix Version/s: 3.7.0 > Add reverseRange and reverseAll query over kv-store in IQv2 > --- > > Key: KAFKA-15527 > URL: https://issues.apache.org/jira/browse/KAFKA-15527 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > Labels: kip > Fix For: 3.7.0 > > > Add reverseRange and reverseAll query over kv-store in IQv2 > Update an implementation of the Query interface, introduced in [KIP-796: > Interactive Query > v2|https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2] > , to support reverseRange and reverseAll. > Use bounded query to achieve reverseRange and use unbounded query to achieve > reverseAll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14808: fix leaderless partition issue when controller removes u… [kafka]
github-actions[bot] commented on PR #13451: URL: https://github.com/apache/kafka/pull/13451#issuecomment-1772023180 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7699) Improve wall-clock time punctuations
[ https://issues.apache.org/jira/browse/KAFKA-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1550#comment-1550 ] Matthias J. Sax commented on KAFKA-7699: Happy to support you. The KIP wiki page describes how it works: [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] – if you have any questions about it, happy to answer them. > Improve wall-clock time punctuations > > > Key: KAFKA-7699 > URL: https://issues.apache.org/jira/browse/KAFKA-7699 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > Currently, wall-clock time punctuation allow to schedule periodic call backs > based on wall-clock time progress. The punctuation time starts, when the > punctuation is scheduled, thus, it's non-deterministic what is desired for > many use cases (I want a call-back in 5 minutes from "now"). > It would be a nice improvement, to allow users to "anchor" wall-clock > punctation, too, similar to a cron job: Thus, a punctuation would be > triggered at "fixed" times like the beginning of the next hour, independent > when the punctuation was registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1549#comment-1549 ] Ismael Juma commented on KAFKA-15653: - cc [~divijvaidya] > NPE in ChunkedByteStream > > > Key: KAFKA-15653 > URL: https://issues.apache.org/jira/browse/KAFKA-15653 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 > Environment: Docker container on a Linux laptop, using the latest > release. >Reporter: Travis Bischel >Priority: Major > > When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR > from producing. The broker logs for the failing request: > > {noformat} > [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing > append operation on partition > 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 > (kafka.server.ReplicaManager) > java.lang.NullPointerException > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) > at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874) > at > kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1542#comment-1542 ] Travis Bischel edited comment on KAFKA-15653 at 10/20/23 2:55 AM: -- {noformat} [2023-10-20 02:31:00,204] ERROR [ReplicaManager broker=1] Error processing append operation on partition 2c69b88eab8670ef1fd0e55b81b9e000995386afd8756ea342494d36911e6f01-29 (kafka.server.ReplicaManager) java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null at org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) at org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) at org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805) at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845) at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:754) at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:686) at kafka.server.KafkaApis.handle(KafkaApis.scala:180) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:149) at java.base/java.lang.Thread.run(Thread.java:833) {noformat} was (Author: twmb): Not just : {noformat} [2023-10-20 02:31:00,204] ERROR [ReplicaManager broker=1] Error processing append operation on partition 2c69b88eab8670ef1fd0e55b81b9e000995386afd8756ea342494d36911e6f01-29 (kafka.server.ReplicaManager) java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null at org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) at org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) at org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805) at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845) at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLi
Re: [PR] KAFKA-15607:Possible NPE is thrown in MirrorCheckpointTask [kafka]
hudeqi commented on code in PR #14587: URL: https://github.com/apache/kafka/pull/14587#discussion_r1366378637 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java: ## @@ -169,6 +169,33 @@ public void testSyncOffset() { "Consumer 2 " + topic2 + " failed"); } +@Test +public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() { +Map> idleConsumerGroupsOffset = new HashMap<>(); +Map> checkpointsPerConsumerGroup = new HashMap<>(); + +String consumer = "consumer"; +String topic = "topic"; +Map ct = new HashMap<>(); +TopicPartition tp = new TopicPartition(topic, 0); +// Simulate other clients such as sarama to reset the group offset of the target cluster to -1. At this time, +// the obtained `OffsetAndMetadata` of the target cluster is null. Review Comment: committed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15607:Possible NPE is thrown in MirrorCheckpointTask [kafka]
hudeqi commented on PR #14587: URL: https://github.com/apache/kafka/pull/14587#issuecomment-1771985878 > Thanks @hudeqi, I think this is reasonable. Do you know why Sarama sets offsets to -1? If it's for normal operations and not indicative of something wrong, we may not even need to log a warning message in that case and could change the check [here](https://github.com/apache/kafka/blob/af747fbfed7e81617c3b3ad0e4dc8c857aa9502b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L325) from `!targetConsumerOffset.containsKey(topicPartition)` to `targetConsumerOffset.get(topicPartition) == null`. Thanks your review. @C0urante . In fact, directly resetting to -1 is an abnormal operation, whether for Sarama or other clients. This problem was discovered in this way: when using the Sarama client, we wanted to reset the group's offset to the latest, so we passed in the `OffsetNewest` in Sarama as a parameter to call the reset offset method. Finally, it was discovered that the offset was reset to -1. The reason is that the value of `OffsetNewest` is -1. For Sarama, resetting to the latest should be another operation process, but this kind of misoperation is not intercepted like the java client, which can be deal friendly. So this issue occurred when encountering scenarios like MM2. So I think it is better to add a warn log here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6
[ https://issues.apache.org/jira/browse/KAFKA-15657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1537#comment-1537 ] Travis Bischel edited comment on KAFKA-15657 at 10/20/23 2:35 AM: -- re: first comment – the client doesn't advance to producing unless AddPartitionsToTxn succeeds. If the request partially succeeds, failed partitions are stripped and only successfully added partitions are produced. The logic is definitely hard to follow if you're not familiar with the code, but here's issuing/stripping: [here|https://github.com/twmb/franz-go/blob/ae169a1f35c2ee6b130c4e520632b33e6c491e0b/pkg/kgo/sink.go#L442-L498,] and here's where the request is issued (in the same function as producing – before the produce request is issued): [here|https://github.com/twmb/franz-go/blob/ae169a1f35c2ee6b130c4e520632b33e6c491e0b/pkg/kgo/sink.go#L316-L357] Also wrt race condition – these tests also pass against the redpanda binary, which has always had KIP-890 semantics / has never allowed transactional produce requests unless the partition has been added to the transaction (in fact this is part of how I caught some early redpanda bugs with _that_ implementation). re: second comment, I'll capture some debug logs so you can see both the client logs and the container. The tests currently are using v3. I'm currently running this in a loop: ``` docker compose down; sleep 1; docker compose up -d ; sleep 5 ; while go test -run Txn/cooperative > logs; do echo whoo; docker compose down; sleep 1; docker compose up -d. sleep 5; done ``` Once this fails, I'll upload the logs. This is currently ignoring INVALID_RECORD, which I more regularly run into. I may remove gating this to just the cooperative test and instead run it against all balancers at once (it seems heavier load runs into the problem more frequently). Also this does remind me though, somebody had a feature request that deliberately abused the ability to produce before AddPartitionsToTxn was done, I need to remove support of this for 3.6+. This _is_ exercised in franz-go's CI right now and will fail CI for 3.6+ (see the doc comment on [EndBeginTxnUnsafe|https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#EndBeginTxnHow]). Edit: KAFKA-15653 may be complicating the investigation here, too. was (Author: twmb): re: first comment – the client doesn't advance to producing unless AddPartitionsToTxn succeeds. If the request partially succeeds, failed partitions are stripped and only successfully added partitions are produced. The logic is definitely hard to follow if you're not familiar with the code, but here's issuing/stripping: [here|https://github.com/twmb/franz-go/blob/ae169a1f35c2ee6b130c4e520632b33e6c491e0b/pkg/kgo/sink.go#L442-L498,] and here's where the request is issued (in the same function as producing – before the produce request is issued): [here|https://github.com/twmb/franz-go/blob/ae169a1f35c2ee6b130c4e520632b33e6c491e0b/pkg/kgo/sink.go#L316-L357] Also wrt race condition – these tests also pass against the redpanda binary, which has always had KIP-890 semantics / has never allowed transactional produce requests unless the partition has been added to the transaction (in fact this is part of how I caught some early redpanda bugs with _that_ implementation). re: second comment, I'll capture some debug logs so you can see both the client logs and the container. The tests currently are using v3. I'm currently running this in a loop: ``` docker compose down; sleep 1; docker compose up -d ; sleep 5 ; while go test -run Txn/cooperative > logs; do echo whoo; docker compose down; sleep 1; docker compose up -d. sleep 5; done ``` Once this fails, I'll upload the logs. This is currently ignoring INVALID_RECORD, which I more regularly run into. I may remove gating this to just the cooperative test and instead run it against all balancers at once (it seems heavier load runs into the problem more frequently). Also this does remind me though, somebody had a feature request that deliberately abused the ability to produce before AddPartitionsToTxn was done, I need to remove support of this for 3.6+. This _is_ exercised in franz-go's CI right now and will fail CI for 3.6+ (see the doc comment on [EndBeginTxnUnsafe|https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#EndBeginTxnHow]). > Unexpected errors when producing transactionally in 3.6 > --- > > Key: KAFKA-15657 > URL: https://issues.apache.org/jira/browse/KAFKA-15657 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 >Reporter: Travis Bischel >Priority: Major > > In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD > (which I created a se
[jira] [Commented] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1542#comment-1542 ] Travis Bischel commented on KAFKA-15653: Not just : {noformat} [2023-10-20 02:31:00,204] ERROR [ReplicaManager broker=1] Error processing append operation on partition 2c69b88eab8670ef1fd0e55b81b9e000995386afd8756ea342494d36911e6f01-29 (kafka.server.ReplicaManager) java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null at org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) at org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) at org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805) at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845) at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:754) at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:686) at kafka.server.KafkaApis.handle(KafkaApis.scala:180) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:149) at java.base/java.lang.Thread.run(Thread.java:833) {noformat} > NPE in ChunkedByteStream > > > Key: KAFKA-15653 > URL: https://issues.apache.org/jira/browse/KAFKA-15653 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 > Environment: Docker container on a Linux laptop, using the latest > release. >Reporter: Travis Bischel >Priority: Major > > When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR > from producing. The broker logs for the failing request: > > {noformat} > [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing > append operation on partition > 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 > (kafka.server.ReplicaManager) > java.lang.NullPointerException > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > sc
[jira] [Updated] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Travis Bischel updated KAFKA-15653: --- Summary: NPE in ChunkedByteStream (was: NPE in ChunkedByteStream.) > NPE in ChunkedByteStream > > > Key: KAFKA-15653 > URL: https://issues.apache.org/jira/browse/KAFKA-15653 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 > Environment: Docker container on a Linux laptop, using the latest > release. >Reporter: Travis Bischel >Priority: Major > > When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR > from producing. The broker logs for the failing request: > > {noformat} > [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing > append operation on partition > 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 > (kafka.server.ReplicaManager) > java.lang.NullPointerException > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) > at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874) > at > kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15346: add support for single-Key_single-timestamp IQs with versioned state stores (KIP-960) [kafka]
aliehsaeedii opened a new pull request, #14596: URL: https://github.com/apache/kafka/pull/14596 This PR implements KIP-960. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6
[ https://issues.apache.org/jira/browse/KAFKA-15657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1537#comment-1537 ] Travis Bischel commented on KAFKA-15657: re: first comment – the client doesn't advance to producing unless AddPartitionsToTxn succeeds. If the request partially succeeds, failed partitions are stripped and only successfully added partitions are produced. The logic is definitely hard to follow if you're not familiar with the code, but here's issuing/stripping: [here|https://github.com/twmb/franz-go/blob/ae169a1f35c2ee6b130c4e520632b33e6c491e0b/pkg/kgo/sink.go#L442-L498,] and here's where the request is issued (in the same function as producing – before the produce request is issued): [here|https://github.com/twmb/franz-go/blob/ae169a1f35c2ee6b130c4e520632b33e6c491e0b/pkg/kgo/sink.go#L316-L357] Also wrt race condition – these tests also pass against the redpanda binary, which has always had KIP-890 semantics / has never allowed transactional produce requests unless the partition has been added to the transaction (in fact this is part of how I caught some early redpanda bugs with _that_ implementation). re: second comment, I'll capture some debug logs so you can see both the client logs and the container. The tests currently are using v3. I'm currently running this in a loop: ``` docker compose down; sleep 1; docker compose up -d ; sleep 5 ; while go test -run Txn/cooperative > logs; do echo whoo; docker compose down; sleep 1; docker compose up -d. sleep 5; done ``` Once this fails, I'll upload the logs. This is currently ignoring INVALID_RECORD, which I more regularly run into. I may remove gating this to just the cooperative test and instead run it against all balancers at once (it seems heavier load runs into the problem more frequently). Also this does remind me though, somebody had a feature request that deliberately abused the ability to produce before AddPartitionsToTxn was done, I need to remove support of this for 3.6+. This _is_ exercised in franz-go's CI right now and will fail CI for 3.6+ (see the doc comment on [EndBeginTxnUnsafe|https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#EndBeginTxnHow]). > Unexpected errors when producing transactionally in 3.6 > --- > > Key: KAFKA-15657 > URL: https://issues.apache.org/jira/browse/KAFKA-15657 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 >Reporter: Travis Bischel >Priority: Major > > In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD > (which I created a separate issue for), and INVALID_TXN_STATE and > UNKNOWN_SERVER_ERROR. > INVALID_TXN_STATE is being returned even though the partitions have been > added to the transaction (AddPartitionsToTxn). Nothing about the code has > changed between 3.5 and 3.6, and I have loop-integration-tested this code > against 3.5 thousands of times. 3.6 is newly - and always - returning > INVALID_TXN_STATE. If I change the code to retry on INVALID_TXN_STATE, I > eventually quickly (always) receive UNKNOWN_SERVER_ERROR. In looking at the > broker logs, the broker indicates that sequence numbers are out of order - > but (a) I am repeating requests that were in order (so something on the > broker got a little haywire maybe? or maybe this is due to me ignoring > invalid_txn_state?), _and_ I am not receiving OUT_OF_ORDER_SEQUENCE_NUMBER, I > am receiving UNKNOWN_SERVER_ERROR. > I think the main problem is the client unexpectedly receiving > INVALID_TXN_STATE, but a second problem here is that OOOSN is being mapped to > USE on return for some reason. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]
dengziming opened a new pull request, #14595: URL: https://github.com/apache/kafka/pull/14595 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]
iit2009060 commented on PR #14483: URL: https://github.com/apache/kafka/pull/14483#issuecomment-1771965660 > @iit2009060 , do you have any comments to this PR? @showuon No , I am good . Thanks @jeel2420 for addressing the review comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR:Remove unused method parameter in ConsumerGroupCommand [kafka]
showuon merged PR #14585: URL: https://github.com/apache/kafka/pull/14585 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR:Remove unused method parameter in ConsumerGroupCommand [kafka]
showuon commented on PR #14585: URL: https://github.com/apache/kafka/pull/14585#issuecomment-1771961258 Failed tests are unrelated: ``` Build / JDK 17 and Scala 2.13 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testProduceConsumeWithPrefixedAcls(String).quorum=kraft Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 8 and Scala 2.12 / kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft Build / JDK 8 and Scala 2.12 / kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore[cache=false, log=false, supplier=ROCKS_WINDOW, kind=DSL] Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures() Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testTimeouts() Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testEarlyControllerResults() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]
showuon commented on PR #14483: URL: https://github.com/apache/kafka/pull/14483#issuecomment-1771958677 @iit2009060 , do you have any comments to this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15566: Fix flaky tests in FetchRequestTest.scala in KRaft mode [kafka]
showuon merged PR #14573: URL: https://github.com/apache/kafka/pull/14573 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15566: Fix flaky tests in FetchRequestTest.scala in KRaft mode [kafka]
showuon commented on PR #14573: URL: https://github.com/apache/kafka/pull/14573#issuecomment-1771954831 Ran 3 times of CI build and no fetchRequestTest failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6
[ https://issues.apache.org/jira/browse/KAFKA-15657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1526#comment-1526 ] Justine Olshan commented on KAFKA-15657: [~twmb] Can you confirm if the AddPartitionsToTxn calls are succeeding? And what version they are using? I am concerned the partitions might not be added correctly. > Unexpected errors when producing transactionally in 3.6 > --- > > Key: KAFKA-15657 > URL: https://issues.apache.org/jira/browse/KAFKA-15657 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 >Reporter: Travis Bischel >Priority: Major > > In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD > (which I created a separate issue for), and INVALID_TXN_STATE and > UNKNOWN_SERVER_ERROR. > INVALID_TXN_STATE is being returned even though the partitions have been > added to the transaction (AddPartitionsToTxn). Nothing about the code has > changed between 3.5 and 3.6, and I have loop-integration-tested this code > against 3.5 thousands of times. 3.6 is newly - and always - returning > INVALID_TXN_STATE. If I change the code to retry on INVALID_TXN_STATE, I > eventually quickly (always) receive UNKNOWN_SERVER_ERROR. In looking at the > broker logs, the broker indicates that sequence numbers are out of order - > but (a) I am repeating requests that were in order (so something on the > broker got a little haywire maybe? or maybe this is due to me ignoring > invalid_txn_state?), _and_ I am not receiving OUT_OF_ORDER_SEQUENCE_NUMBER, I > am receiving UNKNOWN_SERVER_ERROR. > I think the main problem is the client unexpectedly receiving > INVALID_TXN_STATE, but a second problem here is that OOOSN is being mapped to > USE on return for some reason. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6
[ https://issues.apache.org/jira/browse/KAFKA-15657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1524#comment-1524 ] Justine Olshan commented on KAFKA-15657: Hey Travis. INVALID_TXN_STATE likely indicates there was a race condition or a bug in the client. In this case, the transaction should abort. This is part of the work of KIP-890. I wonder if there is a bug in the client that caused hanging (or late messages getting through) before and it is just being caught now. If you want to disable transaction verification, you can by setting transaction.partition.verification.enable to false in your server config files. > Unexpected errors when producing transactionally in 3.6 > --- > > Key: KAFKA-15657 > URL: https://issues.apache.org/jira/browse/KAFKA-15657 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 >Reporter: Travis Bischel >Priority: Major > > In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD > (which I created a separate issue for), and INVALID_TXN_STATE and > UNKNOWN_SERVER_ERROR. > INVALID_TXN_STATE is being returned even though the partitions have been > added to the transaction (AddPartitionsToTxn). Nothing about the code has > changed between 3.5 and 3.6, and I have loop-integration-tested this code > against 3.5 thousands of times. 3.6 is newly - and always - returning > INVALID_TXN_STATE. If I change the code to retry on INVALID_TXN_STATE, I > eventually quickly (always) receive UNKNOWN_SERVER_ERROR. In looking at the > broker logs, the broker indicates that sequence numbers are out of order - > but (a) I am repeating requests that were in order (so something on the > broker got a little haywire maybe? or maybe this is due to me ignoring > invalid_txn_state?), _and_ I am not receiving OUT_OF_ORDER_SEQUENCE_NUMBER, I > am receiving UNKNOWN_SERVER_ERROR. > I think the main problem is the client unexpectedly receiving > INVALID_TXN_STATE, but a second problem here is that OOOSN is being mapped to > USE on return for some reason. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6
Travis Bischel created KAFKA-15657: -- Summary: Unexpected errors when producing transactionally in 3.6 Key: KAFKA-15657 URL: https://issues.apache.org/jira/browse/KAFKA-15657 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 3.6.0 Reporter: Travis Bischel In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD (which I created a separate issue for), and INVALID_TXN_STATE and UNKNOWN_SERVER_ERROR. INVALID_TXN_STATE is being returned even though the partitions have been added to the transaction (AddPartitionsToTxn). Nothing about the code has changed between 3.5 and 3.6, and I have loop-integration-tested this code against 3.5 thousands of times. 3.6 is newly - and always - returning INVALID_TXN_STATE. If I change the code to retry on INVALID_TXN_STATE, I eventually quickly (always) receive UNKNOWN_SERVER_ERROR. In looking at the broker logs, the broker indicates that sequence numbers are out of order - but (a) I am repeating requests that were in order (so something on the broker got a little haywire maybe? or maybe this is due to me ignoring invalid_txn_state?), _and_ I am not receiving OUT_OF_ORDER_SEQUENCE_NUMBER, I am receiving UNKNOWN_SERVER_ERROR. I think the main problem is the client unexpectedly receiving INVALID_TXN_STATE, but a second problem here is that OOOSN is being mapped to USE on return for some reason. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]
mumrah commented on code in PR #14545: URL: https://github.com/apache/kafka/pull/14545#discussion_r1366294162 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java: ## @@ -146,6 +147,15 @@ void handleTopicsSnapshot(TopicsImage topicsImage, KRaftMigrationOperationConsum Map> changedPartitions = new HashMap<>(); Map> newPartitions = new HashMap<>(); +Set pendingTopicDeletions = migrationClient.topicClient().readPendingTopicDeletions(); +if (!pendingTopicDeletions.isEmpty()) { +operationConsumer.accept( +DELETE_PENDING_TOPIC_DELETION, +"Delete pending topic deletions", +migrationState -> migrationClient.topicClient().clearPendingTopicDeletions(pendingTopicDeletions, migrationState) Review Comment: Yea, this is in `handleTopicsSnapshot` which is sync'ing the TopicImage to ZK. Really it doesn't need to happen each time when we handle a snapshot, but I figured putting it here was better than having additional one-off logic at migration time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14264) Refactor coordinator code
[ https://issues.apache.org/jira/browse/KAFKA-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14264: -- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Refactor coordinator code > - > > Key: KAFKA-14264 > URL: https://issues.apache.org/jira/browse/KAFKA-14264 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.4.0 > > > To refactor the consumer, we changed how the coordinator is called. However, > there will be a time period where the old and new implementation need to > coexist, so we will need to override some of the methods and create a new > implementation of the coordinator. In particular: > # ensureCoordinatorReady needs to be non-blocking or we could just use the > sendFindCoordinatorRequest. > # joinGroupIfNeeded needs to be broken up into more find grain stages for > the new implementation to work. > We also need to create the coordinator state machine. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14468) Refactor Commit Logic
[ https://issues.apache.org/jira/browse/KAFKA-14468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14468: -- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Refactor Commit Logic > - > > Key: KAFKA-14468 > URL: https://issues.apache.org/jira/browse/KAFKA-14468 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.5.0 > > > Refactor commit logic using the new multi-threaded coordinator construct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]
mumrah commented on code in PR #14545: URL: https://github.com/apache/kafka/pull/14545#discussion_r1366292982 ## core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala: ## @@ -47,8 +47,14 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie if (!interests.contains(TopicVisitorInterest.TOPICS)) { throw new IllegalArgumentException("Must specify at least TOPICS in topic visitor interests.") } -val topics = zkClient.getAllTopicsInCluster() -val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics) +val allTopics = zkClient.getAllTopicsInCluster() +val topicDeletions = readPendingTopicDeletions().asScala +val topicsToMigrated = allTopics -- topicDeletions +if (topicDeletions.nonEmpty) { + warn(s"Found ${topicDeletions.size} pending topic deletions: $topicDeletions. These will be not migrated " + Review Comment: Yea, i wondered about that. What about logging each deletion separately at TRACE ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored
[ https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15639: -- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Investigate ConsumerNetworkThreadTest's > testResetPositionsProcessFailureIsIgnored > - > > Key: KAFKA-15639 > URL: https://issues.apache.org/jira/browse/KAFKA-15639 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: > > {code:java} > @Test > public void testResetPositionsProcessFailureIsIgnored() { > doThrow(new > NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); > ResetPositionsApplicationEvent event = new > ResetPositionsApplicationEvent(); > applicationEventsQueue.add(event); > assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); > > verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); > } > {code} > > [~junrao] asks: > > {quote}Not sure if this is a useful test since > {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly > throw an exception? > {quote} > > I commented out the {{doThrow}} line and it did not impact the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14247) Implement EventHandler interface and DefaultEventHandler for Consumer
[ https://issues.apache.org/jira/browse/KAFKA-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14247: -- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Implement EventHandler interface and DefaultEventHandler for Consumer > - > > Key: KAFKA-14247 > URL: https://issues.apache.org/jira/browse/KAFKA-14247 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > The background thread runs inside of the DefaultEventHandler to consume > events from the ApplicationEventQueue and produce events to the > BackgroundEventQueue. > The background thread runnable consist of a loop that tries to poll events > from the ApplicationQueue, processes the event if there are any, and poll > networkClient. > In this implementation, the DefaultEventHandler spawns a thread that runs the > BackgroundThreadRunnable. The runnable, as of the current PR, does the > following things: > # Initialize the networkClient > # Poll ApplicationEvent from the queue if there's any > # process the event > # poll the networkClient > PR: https://github.com/apache/kafka/pull/12672 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15270) Integration tests for AsyncConsumer simple consume case
[ https://issues.apache.org/jira/browse/KAFKA-15270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15270: -- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Integration tests for AsyncConsumer simple consume case > --- > > Key: KAFKA-15270 > URL: https://issues.apache.org/jira/browse/KAFKA-15270 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-preview > > This task involves writing integration tests for covering the simple consume > functionality of the AsyncConsumer. This should include validation of the > assign, fetch and positions logic. > Not covering any committed offset functionality as part of this task. > Integration tests should have a similar form as the existing > PlaintextConsumerTest, but scoped to the simple consume flow. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions
[ https://issues.apache.org/jira/browse/KAFKA-15306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15306: -- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Integrate committed offsets logic when updating fetching positions > -- > > Key: KAFKA-15306 > URL: https://issues.apache.org/jira/browse/KAFKA-15306 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.7.0 > > > Integrate refreshCommittedOffsets logic, currently performed by the > coordinator, into the update fetch positions performed on every iteration of > the async consumer poll loop. This should rely on the CommitRequestManager to > perform the request based on the refactored model, but it should reuse the > logic for processing the committed offsets and updating the positions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15115) Implement resetPositions functionality in OffsetsRequestManager
[ https://issues.apache.org/jira/browse/KAFKA-15115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15115: -- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Implement resetPositions functionality in OffsetsRequestManager > --- > > Key: KAFKA-15115 > URL: https://issues.apache.org/jira/browse/KAFKA-15115 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.7.0 > > > Introduce support for resetting positions in the new OffsetsRequestManager. > This task will include a new event for the resetPositions calls performed > from the new consumer, and the logic for handling such events in the > OffsetRequestManager. > The reset positions implementation will keep the same behaviour as the one in > the old consumer, but adapted to the new threading model. So it is based in a > RESET_POSITIONS events that is submitted to the background thread, and then > processed by the ApplicationEventProcessor. The processing itself is done by > the OffsetRequestManager given that this will require a LIST_OFFSETS request > for the partitions awaiting reset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14950) Implement assign() and assignment()
[ https://issues.apache.org/jira/browse/KAFKA-14950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14950: -- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Implement assign() and assignment() > --- > > Key: KAFKA-14950 > URL: https://issues.apache.org/jira/browse/KAFKA-14950 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.6.0 > > > Implement assign() and assignment() -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15163) Implement validatePositions functionality for new KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15163: -- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Implement validatePositions functionality for new KafkaConsumer > --- > > Key: KAFKA-15163 > URL: https://issues.apache.org/jira/browse/KAFKA-15163 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.7.0 > > > Introduce support for validating positions in the new OffsetsRequestManager. > This task will include a new event for the validatePositions calls performed > from the new consumer, and the logic for handling such events in the > OffsetRequestManager. > The validate positions implementation will keep the same behaviour as the one > in the old consumer, but adapted to the new threading model. So it is based > in a VALIDATE_POSITIONS events that is submitted to the background thread, > and the processed by the ApplicationEventProcessor. The processing itself is > done by the OffsetRequestManager given that this will require an > OFFSET_FOR_LEADER_EPOCH request. This task will introduce support for such > requests in the OffsetRequestManager, responsible for offset-related requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15164) Extract reusable logic from OffsetsForLeaderEpochClient
[ https://issues.apache.org/jira/browse/KAFKA-15164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15164: -- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Extract reusable logic from OffsetsForLeaderEpochClient > --- > > Key: KAFKA-15164 > URL: https://issues.apache.org/jira/browse/KAFKA-15164 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > > The OffsetsForLeaderEpochClient class is used for making asynchronous > requests to the OffsetsForLeaderEpoch API. It encapsulates the logic for: > * preparing the requests > * sending them over the network using the network client > * handling the response > The new KafkaConsumer implementation, based on a new threading model, > requires the same logic for preparing the requests and handling the > responses, with different behaviour for how the request is actually sent. > This task includes refactoring OffsetsForLeaderEpochClient by extracting out > the logic for preparing the requests and handling the responses. No changes > in the existing logic, just making the functionality available to be reused. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15174) Ensure the correct thread is executing the callbacks
[ https://issues.apache.org/jira/browse/KAFKA-15174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15174: -- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Ensure the correct thread is executing the callbacks > > > Key: KAFKA-15174 > URL: https://issues.apache.org/jira/browse/KAFKA-15174 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > We need to add assertion tests to ensure the correct thread is executing the > offset commit callbacks and rebalance callback -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky
[ https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14196: -- Labels: new-consumer-threading-should-fix (was: consumer-threading-refactor new-consumer-threading-should-fix) > Duplicated consumption during rebalance, causing OffsetValidationTest to act > flaky > -- > > Key: KAFKA-14196 > URL: https://issues.apache.org/jira/browse/KAFKA-14196 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.2.1 >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: new-consumer-threading-should-fix > Fix For: 3.3.0, 3.2.3 > > > Several flaky tests under OffsetValidationTest are indicating potential > consumer duplication issue, when autocommit is enabled. I believe this is > affecting *3.2* and onward. Below shows the failure message: > > {code:java} > Total consumed records 3366 did not match consumed position 3331 {code} > > After investigating the log, I discovered that the data consumed between the > start of a rebalance event and the async commit was lost for those failing > tests. In the example below, the rebalance event kicks in at around > 1662054846995 (first record), and the async commit of the offset 3739 is > completed at around 1662054847015 (right before partitions_revoked). > > {code:java} > {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]} > {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]} > {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]} > {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]} > {code} > A few things to note here: > # Manually calling commitSync in the onPartitionsRevoke cb seems to > alleviate the issue > # Setting includeMetadataInTimeout to false also seems to alleviate the > issue. > The above tries seems to suggest that contract between poll() and > asyncCommit() is broken. AFAIK, we implicitly uses poll() to ack the > previously fetched data, and the consumer would (try to) commit these offsets > in the current poll() loop. However, it seems like as the poll continues to > loop, the "acked" data isn't being committed. > > I believe this could be introduced in KAFKA-14024, which originated from > KAFKA-13310. > More specifically, (see the comments below), the ConsumerCoordinator will > alway return before async commit, due to the previous incomplete commit. > However, this is a bit contradictory here because: > # I think we want to commit asynchronously while the poll continues, and if > we do that, we are back to KAFKA-14024, that the consumer will get rebalance > timeout and get kicked out of the group. > # But we also need to commit all the "acked" offsets before revoking the > partition, and this has to be blocked. > *Steps to Reproduce the Issue:* > # Check out AK 3.2 > # Run this several times: (Recommend to only run runs with autocommit > enabled in consumer_test.py to save time) > {code:java} > _DUCKTAPE_OPTIONS="--debug" > TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure" > bash tests/docker/run_tests.sh {code} > > *Steps to Diagnose the Issue:* > # Open the test results in *results/* > # Go to the consumer log. It might look like this > > {code:java} > results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY > {code} > 3. Find the docker instance that has partition getting revoked and rejoined. > Observed the offset before and after. > *Propose Fixes:* > TBD > > https://github.com/apache/kafka/pull/12603 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14252) Create background thread skeleton for new Consumer threading model
[ https://issues.apache.org/jira/browse/KAFKA-14252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14252: -- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Create background thread skeleton for new Consumer threading model > -- > > Key: KAFKA-14252 > URL: https://issues.apache.org/jira/browse/KAFKA-14252 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > The event handler internally instantiates a background thread to consume > ApplicationEvents and produce BackgroundEvents. In this ticket, we will > create a skeleton of the background thread. We will incrementally add > implementation in the future. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15656) Frequent INVALID_RECORD on Kafka 3.6
[ https://issues.apache.org/jira/browse/KAFKA-15656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1515#comment-1515 ] Travis Bischel commented on KAFKA-15656: Note that if I change my code to retry on INVALID_RECORD – and repeat the same exact serialization – the produce request will succeed when repeated. > Frequent INVALID_RECORD on Kafka 3.6 > > > Key: KAFKA-15656 > URL: https://issues.apache.org/jira/browse/KAFKA-15656 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 >Reporter: Travis Bischel >Priority: Major > Attachments: invalid_record.log > > > Using this docker-compose.yml: > {noformat} > version: "3.7" > services: > kafka: > image: bitnami/kafka:latest > network_mode: host > environment: > KAFKA_ENABLE_KRAFT: yes > KAFKA_CFG_PROCESS_ROLES: controller,broker > KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER > KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 > KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: > CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT > KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093 > # Set this to "PLAINTEXT://127.0.0.1:9092" if you want to run this > container on localhost via Docker > KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 > KAFKA_CFG_NODE_ID: 1 > ALLOW_PLAINTEXT_LISTENER: yes > KAFKA_KRAFT_CLUSTER_ID: XkpGZQ27R3eTl3OdTm2LYA # 16 byte base64-encoded > UUID{noformat} > And running franz-go integration tests with KGO_TEST_RF=1, I consistently > receive INVALID_RECORD errors. > > Looking at the container logs, I see these problematic log lines: > {noformat} > 2023-10-19 23:33:47,942] ERROR [ReplicaManager broker=1] Error processing > append operation on partition > 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.InvalidRecordException: Invalid negative header key > size -25 > [2023-10-19 23:33:47,942] ERROR [ReplicaManager broker=1] Error processing > append operation on partition > 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-6 > (kafka.server.ReplicaManager) > org.apache.kafka.common.InvalidRecordException: Reached end of input stream > before skipping all bytes. Remaining bytes:94 > [2023-10-19 23:33:47,942] ERROR [ReplicaManager broker=1] Error processing > append operation on partition > 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-1 > (kafka.server.ReplicaManager) > org.apache.kafka.common.InvalidRecordException: Found invalid number of > record headers -26 > [2023-10-19 23:33:47,948] ERROR [ReplicaManager broker=1] Error processing > append operation on partition > 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-6 > (kafka.server.ReplicaManager) > org.apache.kafka.common.InvalidRecordException: Found invalid number of > record headers -27 > [2023-10-19 23:33:47,950] ERROR [ReplicaManager broker=1] Error processing > append operation on partition > 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-22 > (kafka.server.ReplicaManager) > org.apache.kafka.common.InvalidRecordException: Invalid negative header key > size -25 > [2023-10-19 23:33:47,947] ERROR [ReplicaManager broker=1] Error processing > append operation on partition > c63b6e30987317fad18815effb8d432b6df677d2ab56cf6da517bb93fa49b74b-25 > (kafka.server.ReplicaManager) > org.apache.kafka.common.InvalidRecordException: Found invalid number of > record headers -50 > [2023-10-19 23:33:47,959] ERROR [ReplicaManager broker=1] Error processing > append operation on partition > c63b6e30987317fad18815effb8d432b6df677d2ab56cf6da517bb93fa49b74b-25 > (kafka.server.ReplicaManager) > {noformat} > > I modified franz-go with a diff to print the request that was written to the > wire once this error occurs. Attached is a v9 produce request. I deserialized > it locally and am not seeing the corrupt data that Kafka is printing. It's > possible there is a bug in the client, but again, these tests have never > received this error pre-Kafka 3.6. It _looks like_ there is either corruption > when processing the incoming data, or there is some problematic race > condition in the broker - I'm not sure which. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15631) Do not send new heartbeat request while another one in-flight
[ https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15631: -- Labels: kip-848 kip-848-client-support kip-848-e2e kip-848-preview (was: kip-848-client-support kip-848-e2e kip-848-preview) > Do not send new heartbeat request while another one in-flight > - > > Key: KAFKA-15631 > URL: https://issues.apache.org/jira/browse/KAFKA-15631 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > Client consumer should not send a new heartbeat request while there is a > previous in-flight. If a HB is in-flight, we should wait for a response or > timeout before sending a next one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15282) Implement client support for KIP-848 client-side assignors
[ https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15282: -- Labels: kip-848 kip-848-client-support (was: consumer-threading-refactor kip-848 kip-848-client-support) > Implement client support for KIP-848 client-side assignors > -- > > Key: KAFKA-15282 > URL: https://issues.apache.org/jira/browse/KAFKA-15282 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: kip-848, kip-848-client-support > > The client-side assignor provides the logic for the partition assignments > instead of on the server. Client-side assignment is the main approach used by > the “old protocol” for divvying up partitions. While the “new protocol” > favors server-side assignors, the client-side assignor will continue to be > used for backward compatibility, including KSQL, Connect, etc. > Note: I _*think*_ that the client-side assignor logic and the reconciliation > logic can remain separate from each other. We should strive to keep the two > pieces unencumbered, unless it’s unavoidable. > This task includes: > * Validate the client’s configuration for assignor selection > * Integrate with the new {{PartitionAssignor}} interface to invoke the logic > from the user-provided assignor implementation > * Implement the necessary logic around the request/response from the > {{ConsumerGroupPrepareAssignment}} RPC call using the information from the > {{PartitionAssignor}} above > * Implement the necessary logic around the request/response from the > {{ConsumerGroupInstallAssignment}} RPC call, again using the information > calculated by the {{PartitionAssignor}} > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15279) Implement client support for KIP-848 assignment RPCs
[ https://issues.apache.org/jira/browse/KAFKA-15279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15279: -- Labels: kip-848 kip-848-client-support (was: consumer-threading-refactor kip-848 kip-848-client-support) > Implement client support for KIP-848 assignment RPCs > > > Key: KAFKA-15279 > URL: https://issues.apache.org/jira/browse/KAFKA-15279 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: kip-848, kip-848-client-support > > The protocol introduces three new RPCs that the client uses to communicate > with the broker: > # > [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI] > # > [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI] > # > [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI] > Support for ConsumerGroupHeartbeat is handled by KAFKA-15278. This task is to > implement the ConsumerGroupAssignmentRequestManager to handle the second and > third RPCs on the above list. > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
[ https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15278: -- Labels: kip-848 kip-848-client-support kip-848-e2e kip-848-preview (was: consumer-threading-refactor kip-848 kip-848-client-support kip-848-e2e kip-848-preview) > Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC > > > Key: KAFKA-15278 > URL: https://issues.apache.org/jira/browse/KAFKA-15278 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} > and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. > It is assumed that the scaffolding for the other two will come along in time. > * Implement {{ConsumerGroupRequestManager}} > * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts > so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} > interval regardless of other {{RequestManager}} instance activity > * Ensure error is handled correctly > * Ensure MembershipStateManager is updated on both successful and failures > cases, and the state machine is transioned to the correct state. > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15651) Investigate auto commit guarantees during Consumer.assign()
[ https://issues.apache.org/jira/browse/KAFKA-15651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15651: -- Labels: consumer-threading-refactor kip-848-preview (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Investigate auto commit guarantees during Consumer.assign() > --- > > Key: KAFKA-15651 > URL: https://issues.apache.org/jira/browse/KAFKA-15651 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > > In the {{assign()}} method implementation, both {{KafkaConsumer}} and > {{PrototypeAsyncConsumer}} commit offsets asynchronously. Is this > intentional? [~junrao] asks in a [recent PR > review|https://github.com/apache/kafka/pull/14406/files/193af8230d0c61853d764cbbe29bca2fc6361af9#r1349023459]: > {quote}Do we guarantee that the new owner of the unsubscribed partitions > could pick up the latest committed offset? > {quote} > Let's confirm whether the asynchronous approach is acceptable and correct. If > it is, great, let's enhance the documentation to briefly explain why. If it > is not, let's correct the behavior if it's within the API semantic > expectations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15548) Ensure all resources created by the Consumer are close()-ed properly
[ https://issues.apache.org/jira/browse/KAFKA-15548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15548: -- Labels: consumer-threading-refactor kip-848 kip-848-e2e kip-848-preview (was: consumer-threading-refactor kip-848 kip-848-client-support kip-848-e2e kip-848-preview) > Ensure all resources created by the Consumer are close()-ed properly > > > Key: KAFKA-15548 > URL: https://issues.apache.org/jira/browse/KAFKA-15548 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848, kip-848-e2e, > kip-848-preview > > Upon closing of the {{Consumer}} we need to: > # Complete pending commits > # Revoke assignment (Note that the revocation involves stop fetching, > committing offsets if auto-commit enabled and invoking the > onPartitionsRevoked callback) > # Send the last GroupConsumerHeartbeatRequest with epoch = -1 to leave the > group (or -2 if static member) > # Close any fetch sessions on the brokers > # Poll the NetworkClient to complete pending I/O > There is a mechanism introduced in PR > [14406|https://github.com/apache/kafka/pull/14406] that allows for performing > network I/O on shutdown. The new method > {{DefaultBackgroundThread.runAtClose()}} will be executed when > {{Consumer.close()}} is invoked. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded
[ https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15637: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-848-client-support) > Investigate FetcherTest's/FetchRequestManager's > testFetchCompletedBeforeHandlerAdded > > > Key: KAFKA-15637 > URL: https://issues.apache.org/jira/browse/KAFKA-15637 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > > Thanks for the reply. I still don't quite understand the test. Why do we > duplicate the following code both inside and outside of {{{}setWakeupHook{}}}? > > {code:java} > networkClientDelegate.disconnectAsync(readReplica); > networkClientDelegate.poll(time.timer(0)); > {code} > > MockClient is only woken up through > {{{}networkClientDelegate.disconnectAsync{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer
[ https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15638: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-848-client-support) > Investigate ConsumerNetworkThreadTest's testPollResultTimer > --- > > Key: KAFKA-15638 > URL: https://issues.apache.org/jira/browse/KAFKA-15638 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > Regarding this comment in {{{}testPollResultTimer{}}}... > {code:java} > // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE > upon success| > {code} > [~junrao] asked: > {quote}Which call is returning Long.MAX_VALUE? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15652) Add unit/integration tests to verify OffsetOutOfRangeException is thrown for OffsetFetcherUtils.getOffsetResetTimestamp()
[ https://issues.apache.org/jira/browse/KAFKA-15652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15652: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-848-client-support) > Add unit/integration tests to verify OffsetOutOfRangeException is thrown for > OffsetFetcherUtils.getOffsetResetTimestamp() > - > > Key: KAFKA-15652 > URL: https://issues.apache.org/jira/browse/KAFKA-15652 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > > In the {{updateFetchPositions()}} method implementation, both > {{KafkaConsumer}} and {{PrototypeAsyncConsumer}} reset positions > asynchronously. [~junrao] stated the following in a [recent PR > review|https://github.com/apache/kafka/pull/14406#discussion_r1349173413]: > {quote}There is a subtle difference between transitioning to reset from > initializing and transitioning to reset from {{OffsetOutOfRangeException}} > during fetch. In the latter, the application thread will call > {{{}FetchCollector.handleInitializeErrors(){}}}. If there is no default > offset reset policy, an {{OffsetOutOfRangeException}} will be thrown to the > application thread during {{{}poll{}}}, which is what we want. > However, for the former, if there is no default offset reset policy, we > simply ignore that partition through > {{{}OffsetFetcherUtils.getOffsetResetTimestamp{}}}. It seems in that case, > the partition will be forever in the reset state and the application thread > won't get the {{{}OffsetOutOfRangeException{}}}. > {quote} > I intentionally changed the code so that no exceptions were thrown in > {{OffsetFetcherUtils.getOffsetResetTimestamp()}} and would simply return an > empty map. When I ran the unit tests and integration tests, there were no > failures, strongly suggesting that there is no coverage of this particular > edge case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId
[ https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15557: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-848-client-support) > Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in > assignFromUserNoId > --- > > Key: KAFKA-15557 > URL: https://issues.apache.org/jira/browse/KAFKA-15557 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > > The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods > named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to > perform duplicate metadata updates: > {code:java} > private void assignFromUser(Set partitions) { > subscriptions.assignFromUser(partitions); > client.updateMetadata(initialUpdateResponse); > // A dummy metadata update to ensure valid leader epoch. > metadata.updateWithCurrentRequestVersion( > RequestTestUtils.metadataUpdateWithIds( > "dummy", > 1, > Collections.emptyMap(), > singletonMap(topicName, 4), > tp -> validLeaderEpoch, topicIds > ), > false, > 0L > ); > } > {code} > {{client.updateMetadata()}} eventually calls > {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is > updating the cluster metadata twice with different values. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()
[ https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15558: -- Labels: consumer-threading-refactor kip-848-preview (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Determine if Timer should be used elsewhere in > PrototypeAsyncConsumer.updateFetchPositions() > > > Key: KAFKA-15558 > URL: https://issues.apache.org/jira/browse/KAFKA-15558 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > > This is a followup ticket based on a question from [~junrao] when reviewing > the [fetch request manager pull > request|https://github.com/apache/kafka/pull/14406]: > {quote}It still seems weird that we only use the timer for > {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we > don't have valid fetch positions. For example, if all partitions are in > {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} > will just go in a busy loop, which is not efficient. > {quote} > The goal here is to determine if we should also be propagating the Timer to > the validate positions and reset positions operations. > Note: we should also investigate if the existing {{KafkaConsumer}} > implementation should be fixed, too. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15636: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-848-client-support) > Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics > > > Key: KAFKA-15636 > URL: https://issues.apache.org/jira/browse/KAFKA-15636 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > > {{expectedBytes}} is calculated as total, instead of avg. Is this correct? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15615) Improve handling of fetching during metadata updates
[ https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15615: -- Labels: consumer-threading-refactor kip-848-preview (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Improve handling of fetching during metadata updates > > > Key: KAFKA-15615 > URL: https://issues.apache.org/jira/browse/KAFKA-15615 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > > [During a review of the new > fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], > [~junrao] found what appears to be an opportunity for optimization. > When a fetch response receives an error about partition leadership, fencing, > etc. a metadata refresh is triggered. However, it takes time for that refresh > to occur, and in the interim, it appears that the consumer will blindly > attempt to fetch data for the partition again, in kind of a "definition of > insanity" type of way. Ideally, the consumer would have a way to temporarily > ignore those partitions, in a way somewhat like the "pausing" approach so > that they are skipped until the metadata refresh response is fully processed. > This affects both the existing KafkaConsumer and the new > PrototypeAsyncConsumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15534) Propagate client response time when timeout to the request handler
[ https://issues.apache.org/jira/browse/KAFKA-15534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15534: -- Labels: consumer-threading-refactor kip-848 kip-848-e2e kip-848-preview (was: consumer-threading-refactor kip-848 kip-848-client-support kip-848-e2e kip-848-preview) > Propagate client response time when timeout to the request handler > -- > > Key: KAFKA-15534 > URL: https://issues.apache.org/jira/browse/KAFKA-15534 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, kip-848, kip-848-e2e, > kip-848-preview > > Currently, we don't have a good way to propagate the response time to the > handler when timeout is thrown. > {code:java} > unsent.handler.onFailure(new TimeoutException( > "Failed to send request after " + unsent.timer.timeoutMs() + " ms.")); > {code} > The current request manager invoke a system call to retrieve the response > time, which is not idea because it is already available at network client > This is an example of the coordinator request manager: > {code:java} > unsentRequest.future().whenComplete((clientResponse, throwable) -> { > long responseTimeMs = time.milliseconds(); > if (clientResponse != null) { > FindCoordinatorResponse response = (FindCoordinatorResponse) > clientResponse.responseBody(); > onResponse(responseTimeMs, response); > } else { > onFailedResponse(responseTimeMs, throwable); > } > }); {code} > But in the networkClientDelegate, we should utilize the currentTimeMs in the > trySend to avoid calling time.milliseconds(): > {code:java} > private void trySend(final long currentTimeMs) { > ... > unsent.handler.onFailure(new TimeoutException( > "Failed to send request after " + unsent.timer.timeoutMs() + " ms.")); > continue; > } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15617) Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions and testInflightFetchOnPendingPartitions overlap
[ https://issues.apache.org/jira/browse/KAFKA-15617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15617: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-848-client-support) > Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions > and testInflightFetchOnPendingPartitions overlap > -- > > Key: KAFKA-15617 > URL: https://issues.apache.org/jira/browse/KAFKA-15617 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > > In FetcherTest, the two tests testFetchingPendingPartitions and > testInflightFetchOnPendingPartitions have significant overlap. Perhaps the > former subsumes the latter? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect
[ https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15556: -- Labels: consumer-threading-refactor kip-848-preview (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, > and tryConnect > - > > Key: KAFKA-15556 > URL: https://issues.apache.org/jira/browse/KAFKA-15556 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > > The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to > handle networking details in a more centralized way. However, in order to > reuse code between the existing {{KafkaConsumer}} and the new > {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the > {{NetworkClientDelegate}} capitulated and -stole- copied three methods from > {{ConsumerNetworkClient}} related to detecting node status: > # {{isUnavailable}} > # {{maybeThrowAuthFailure}} > # {{tryConnect}} > Unfortunately, these have found their way into the {{FetchRequestManager}} > and {{OffsetsRequestManager}} implementations. We should review if we can > clean up—or even remove—this leaky abstraction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-848-client-support) > Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval > - > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > > As part of the review for [FetchRequestManager pull > request|https://github.com/apache/kafka/pull/14406], [~junrao] had some > questions related to the correctness and clarity of the > {{FetcherTest.testCompletedFetchRemoval()}} test: > Questions: > * https://github.com/apache/kafka/pull/14406#discussion_r1347908197 > * https://github.com/apache/kafka/pull/14406#discussion_r1347910980 > * https://github.com/apache/kafka/pull/14406#discussion_r1347913781 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15634) Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15634: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-848-client-support) > Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics > > > Key: KAFKA-15634 > URL: https://issues.apache.org/jira/browse/KAFKA-15634 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > > What is the point of the code in the initial {{while}} loop since the receive > is delayed and thus there is no {{throttleDelayMs}} received in the client? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15635) Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric
[ https://issues.apache.org/jira/browse/KAFKA-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15635: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-848-client-support) > Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric > - > > Key: KAFKA-15635 > URL: https://issues.apache.org/jira/browse/KAFKA-15635 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > > Why is {{recordsFetchLeadMin}} different from {{partitionLead}} given there > is only 1 assigned partition? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15550) OffsetsForTimes validation for negative timestamps in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15550: -- Labels: consumer-threading-refactor kip-848 kip-848-preview (was: consumer-threading-refactor kip-848 kip-848-client-support kip-848-preview) > OffsetsForTimes validation for negative timestamps in new consumer > -- > > Key: KAFKA-15550 > URL: https://issues.apache.org/jira/browse/KAFKA-15550 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848, kip-848-preview > > OffsetsForTimes api call should fail with _IllegalArgumentException_ if > negative timestamps are provided as arguments. This will effectively exclude > earliest and latest offsets as target times, keeping the current behaviour of > the KafkaConsumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored
[ https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15639: -- Component/s: unit tests > Investigate ConsumerNetworkThreadTest's > testResetPositionsProcessFailureIsIgnored > - > > Key: KAFKA-15639 > URL: https://issues.apache.org/jira/browse/KAFKA-15639 > Project: Kafka > Issue Type: Task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: > > {code:java} > @Test > public void testResetPositionsProcessFailureIsIgnored() { > doThrow(new > NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); > ResetPositionsApplicationEvent event = new > ResetPositionsApplicationEvent(); > applicationEventsQueue.add(event); > assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); > > verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); > } > {code} > > [~junrao] asks: > > {quote}Not sure if this is a useful test since > {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly > throw an exception? > {quote} > > I commented out the {{doThrow}} line and it did not impact the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored
[ https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15639: -- Summary: Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored (was: Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored) > Investigate ConsumerNetworkThreadTest's > testResetPositionsProcessFailureIsIgnored > - > > Key: KAFKA-15639 > URL: https://issues.apache.org/jira/browse/KAFKA-15639 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: > > {code:java} > @Test > public void testResetPositionsProcessFailureIsIgnored() { > doThrow(new > NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); > ResetPositionsApplicationEvent event = new > ResetPositionsApplicationEvent(); > applicationEventsQueue.add(event); > assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); > > verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); > } > {code} > > [~junrao] asks: > > {quote}Not sure if this is a useful test since > {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly > throw an exception? > {quote} > > I commented out the {{doThrow}} line and it did not impact the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14724) Port tests in FetcherTest to FetchRequestManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-14724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14724: -- Component/s: unit tests > Port tests in FetcherTest to FetchRequestManagerTest > > > Key: KAFKA-14724 > URL: https://issues.apache.org/jira/browse/KAFKA-14724 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task involves copying the relevant tests from {{FetcherTest}} and > modifying them to fit a new unit test named {{{}FetchRequestManagerTest{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15656) Frequent INVALID_RECORD on Kafka 3.6
Travis Bischel created KAFKA-15656: -- Summary: Frequent INVALID_RECORD on Kafka 3.6 Key: KAFKA-15656 URL: https://issues.apache.org/jira/browse/KAFKA-15656 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 3.6.0 Reporter: Travis Bischel Attachments: invalid_record.log Using this docker-compose.yml: {noformat} version: "3.7" services: kafka: image: bitnami/kafka:latest network_mode: host environment: KAFKA_ENABLE_KRAFT: yes KAFKA_CFG_PROCESS_ROLES: controller,broker KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093 # Set this to "PLAINTEXT://127.0.0.1:9092" if you want to run this container on localhost via Docker KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_CFG_NODE_ID: 1 ALLOW_PLAINTEXT_LISTENER: yes KAFKA_KRAFT_CLUSTER_ID: XkpGZQ27R3eTl3OdTm2LYA # 16 byte base64-encoded UUID{noformat} And running franz-go integration tests with KGO_TEST_RF=1, I consistently receive INVALID_RECORD errors. Looking at the container logs, I see these problematic log lines: {noformat} 2023-10-19 23:33:47,942] ERROR [ReplicaManager broker=1] Error processing append operation on partition 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-0 (kafka.server.ReplicaManager) org.apache.kafka.common.InvalidRecordException: Invalid negative header key size -25 [2023-10-19 23:33:47,942] ERROR [ReplicaManager broker=1] Error processing append operation on partition 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-6 (kafka.server.ReplicaManager) org.apache.kafka.common.InvalidRecordException: Reached end of input stream before skipping all bytes. Remaining bytes:94 [2023-10-19 23:33:47,942] ERROR [ReplicaManager broker=1] Error processing append operation on partition 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-1 (kafka.server.ReplicaManager) org.apache.kafka.common.InvalidRecordException: Found invalid number of record headers -26 [2023-10-19 23:33:47,948] ERROR [ReplicaManager broker=1] Error processing append operation on partition 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-6 (kafka.server.ReplicaManager) org.apache.kafka.common.InvalidRecordException: Found invalid number of record headers -27 [2023-10-19 23:33:47,950] ERROR [ReplicaManager broker=1] Error processing append operation on partition 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-22 (kafka.server.ReplicaManager) org.apache.kafka.common.InvalidRecordException: Invalid negative header key size -25 [2023-10-19 23:33:47,947] ERROR [ReplicaManager broker=1] Error processing append operation on partition c63b6e30987317fad18815effb8d432b6df677d2ab56cf6da517bb93fa49b74b-25 (kafka.server.ReplicaManager) org.apache.kafka.common.InvalidRecordException: Found invalid number of record headers -50 [2023-10-19 23:33:47,959] ERROR [ReplicaManager broker=1] Error processing append operation on partition c63b6e30987317fad18815effb8d432b6df677d2ab56cf6da517bb93fa49b74b-25 (kafka.server.ReplicaManager) {noformat} I modified franz-go with a diff to print the request that was written to the wire once this error occurs. Attached is a v9 produce request. I deserialized it locally and am not seeing the corrupt data that Kafka is printing. It's possible there is a bug in the client, but again, these tests have never received this error pre-Kafka 3.6. It _looks like_ there is either corruption when processing the incoming data, or there is some problematic race condition in the broker - I'm not sure which. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15617) Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions and testInflightFetchOnPendingPartitions overlap
[ https://issues.apache.org/jira/browse/KAFKA-15617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15617: -- Component/s: unit tests Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions > and testInflightFetchOnPendingPartitions overlap > -- > > Key: KAFKA-15617 > URL: https://issues.apache.org/jira/browse/KAFKA-15617 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > > In FetcherTest, the two tests testFetchingPendingPartitions and > testInflightFetchOnPendingPartitions have significant overlap. Perhaps the > former subsumes the latter? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15636: -- Component/s: unit tests Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics > > > Key: KAFKA-15636 > URL: https://issues.apache.org/jira/browse/KAFKA-15636 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > > {{expectedBytes}} is calculated as total, instead of avg. Is this correct? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer
[ https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15638: -- Component/s: unit tests Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Investigate ConsumerNetworkThreadTest's testPollResultTimer > --- > > Key: KAFKA-15638 > URL: https://issues.apache.org/jira/browse/KAFKA-15638 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > > Regarding this comment in {{{}testPollResultTimer{}}}... > {code:java} > // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE > upon success| > {code} > [~junrao] asked: > {quote}Which call is returning Long.MAX_VALUE? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded
[ https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15637: -- Component/s: unit tests Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Investigate FetcherTest's/FetchRequestManager's > testFetchCompletedBeforeHandlerAdded > > > Key: KAFKA-15637 > URL: https://issues.apache.org/jira/browse/KAFKA-15637 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > > Thanks for the reply. I still don't quite understand the test. Why do we > duplicate the following code both inside and outside of {{{}setWakeupHook{}}}? > > {code:java} > networkClientDelegate.disconnectAsync(readReplica); > networkClientDelegate.poll(time.timer(0)); > {code} > > MockClient is only woken up through > {{{}networkClientDelegate.disconnectAsync{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15634) Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15634: -- Component/s: unit tests Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics > > > Key: KAFKA-15634 > URL: https://issues.apache.org/jira/browse/KAFKA-15634 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > > What is the point of the code in the initial {{while}} loop since the receive > is delayed and thus there is no {{throttleDelayMs}} received in the client? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId
[ https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15557: -- Component/s: unit tests Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in > assignFromUserNoId > --- > > Key: KAFKA-15557 > URL: https://issues.apache.org/jira/browse/KAFKA-15557 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > > The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods > named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to > perform duplicate metadata updates: > {code:java} > private void assignFromUser(Set partitions) { > subscriptions.assignFromUser(partitions); > client.updateMetadata(initialUpdateResponse); > // A dummy metadata update to ensure valid leader epoch. > metadata.updateWithCurrentRequestVersion( > RequestTestUtils.metadataUpdateWithIds( > "dummy", > 1, > Collections.emptyMap(), > singletonMap(topicName, 4), > tp -> validLeaderEpoch, topicIds > ), > false, > 0L > ); > } > {code} > {{client.updateMetadata()}} eventually calls > {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is > updating the cluster metadata twice with different values. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval > - > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > > As part of the review for [FetchRequestManager pull > request|https://github.com/apache/kafka/pull/14406], [~junrao] had some > questions related to the correctness and clarity of the > {{FetcherTest.testCompletedFetchRemoval()}} test: > Questions: > * https://github.com/apache/kafka/pull/14406#discussion_r1347908197 > * https://github.com/apache/kafka/pull/14406#discussion_r1347910980 > * https://github.com/apache/kafka/pull/14406#discussion_r1347913781 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15652) Add unit/integration tests to verify OffsetOutOfRangeException is thrown for OffsetFetcherUtils.getOffsetResetTimestamp()
[ https://issues.apache.org/jira/browse/KAFKA-15652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15652: -- Component/s: unit tests Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Add unit/integration tests to verify OffsetOutOfRangeException is thrown for > OffsetFetcherUtils.getOffsetResetTimestamp() > - > > Key: KAFKA-15652 > URL: https://issues.apache.org/jira/browse/KAFKA-15652 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > > In the {{updateFetchPositions()}} method implementation, both > {{KafkaConsumer}} and {{PrototypeAsyncConsumer}} reset positions > asynchronously. [~junrao] stated the following in a [recent PR > review|https://github.com/apache/kafka/pull/14406#discussion_r1349173413]: > {quote}There is a subtle difference between transitioning to reset from > initializing and transitioning to reset from {{OffsetOutOfRangeException}} > during fetch. In the latter, the application thread will call > {{{}FetchCollector.handleInitializeErrors(){}}}. If there is no default > offset reset policy, an {{OffsetOutOfRangeException}} will be thrown to the > application thread during {{{}poll{}}}, which is what we want. > However, for the former, if there is no default offset reset policy, we > simply ignore that partition through > {{{}OffsetFetcherUtils.getOffsetResetTimestamp{}}}. It seems in that case, > the partition will be forever in the reset state and the application thread > won't get the {{{}OffsetOutOfRangeException{}}}. > {quote} > I intentionally changed the code so that no exceptions were thrown in > {{OffsetFetcherUtils.getOffsetResetTimestamp()}} and would simply return an > empty map. When I ran the unit tests and integration tests, there were no > failures, strongly suggesting that there is no coverage of this particular > edge case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15635) Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric
[ https://issues.apache.org/jira/browse/KAFKA-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15635: -- Component/s: unit tests Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric > - > > Key: KAFKA-15635 > URL: https://issues.apache.org/jira/browse/KAFKA-15635 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > > Why is {{recordsFetchLeadMin}} different from {{partitionLead}} given there > is only 1 assigned partition? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer
[ https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15638: -- Summary: Investigate ConsumerNetworkThreadTest's testPollResultTimer (was: Investigate ConsumerNetworkThread's testPollResultTimer) > Investigate ConsumerNetworkThreadTest's testPollResultTimer > --- > > Key: KAFKA-15638 > URL: https://issues.apache.org/jira/browse/KAFKA-15638 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > Regarding this comment in {{{}testPollResultTimer{}}}... > {code:java} > // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE > upon success| > {code} > [~junrao] asked: > {quote}Which call is returning Long.MAX_VALUE? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Component/s: unit tests > Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval > - > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > As part of the review for [FetchRequestManager pull > request|https://github.com/apache/kafka/pull/14406], [~junrao] had some > questions related to the correctness and clarity of the > {{FetcherTest.testCompletedFetchRemoval()}} test: > Questions: > * https://github.com/apache/kafka/pull/14406#discussion_r1347908197 > * https://github.com/apache/kafka/pull/14406#discussion_r1347910980 > * https://github.com/apache/kafka/pull/14406#discussion_r1347913781 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15277) Design & implement support for internal Consumer delegates
[ https://issues.apache.org/jira/browse/KAFKA-15277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15277: -- Labels: consumer-threading-refactor kip-848 kip-848-e2e kip-848-preview (was: consumer-threading-refactor kip-848 kip-848-client-support kip-848-e2e kip-848-preview) > Design & implement support for internal Consumer delegates > -- > > Key: KAFKA-15277 > URL: https://issues.apache.org/jira/browse/KAFKA-15277 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor, kip-848, kip-848-e2e, > kip-848-preview > > As mentioned above, there are presently two different, coexisting > implementations of the {{Consumer}} interface: {{KafkaConsumer}} ("old") and > {{PrototypeAsyncConsumer}} ("new"). Eventually, these will be reorganized > using the delegation pattern. The top-level {{KafkaConsumer}} that implements > the old protocol will be renamed as {{LegacyKafkaConsumerDelegate}} and > {{PrototypeAsyncConsumer}} will be renamed as > {{{}AsyncKafkaConsume{}}}{{{}rDelegate{}}}. It is assumed that neither > {{{}AsyncKafkaConsume{}}}{{{}rDelegate{}}} nor > {{{}LegacyKafkaConsume{}}}{{{}rDelegate{}}} will be top-level implementations > of {{{}Consumer{}}}, but will likely implement an internal interface that is > better suited to the needs of the top-level {{{}KafkaConsumer{}}}. > Provide the Java client support for the consumer delegates, including: > * Create {{ConsumerDelegate}} interface > * Clone {{{}KafkaConsumer{}}}, rename as {{LegacyKafkaConsumerDelegate}} and > refactor to implement {{ConsumerDelegate}} > * Rename {{PrototypeAsyncConsumer}} to {{AsyncKafkaConsumerDelegate}} and > refactor to implement the {{ConsumerDelegate}} interface > * Refactor the (original) {{KafkaConsumer}} to remove the core > implementation, instead delegating to the {{{}ConsumerDelegate{}}}, which > will be hard-coded to use {{LegacyKafkaConsumerDelegate}} > * Once available (in KAFKA-15284), use the > {{ConsumerGroupProtocolVersionResolver}} to determine which delegate to use > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect
[ https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15556: -- Component/s: clients consumer > Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, > and tryConnect > - > > Key: KAFKA-15556 > URL: https://issues.apache.org/jira/browse/KAFKA-15556 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to > handle networking details in a more centralized way. However, in order to > reuse code between the existing {{KafkaConsumer}} and the new > {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the > {{NetworkClientDelegate}} capitulated and -stole- copied three methods from > {{ConsumerNetworkClient}} related to detecting node status: > # {{isUnavailable}} > # {{maybeThrowAuthFailure}} > # {{tryConnect}} > Unfortunately, these have found their way into the {{FetchRequestManager}} > and {{OffsetsRequestManager}} implementations. We should review if we can > clean up—or even remove—this leaky abstraction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()
[ https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15558: -- Component/s: clients consumer > Determine if Timer should be used elsewhere in > PrototypeAsyncConsumer.updateFetchPositions() > > > Key: KAFKA-15558 > URL: https://issues.apache.org/jira/browse/KAFKA-15558 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > This is a followup ticket based on a question from [~junrao] when reviewing > the [fetch request manager pull > request|https://github.com/apache/kafka/pull/14406]: > {quote}It still seems weird that we only use the timer for > {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we > don't have valid fetch positions. For example, if all partitions are in > {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} > will just go in a busy loop, which is not efficient. > {quote} > The goal here is to determine if we should also be propagating the Timer to > the validate positions and reset positions operations. > Note: we should also investigate if the existing {{KafkaConsumer}} > implementation should be fixed, too. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15615) Improve handling of fetching during metadata updates
[ https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15615: -- Component/s: clients consumer > Improve handling of fetching during metadata updates > > > Key: KAFKA-15615 > URL: https://issues.apache.org/jira/browse/KAFKA-15615 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > [During a review of the new > fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], > [~junrao] found what appears to be an opportunity for optimization. > When a fetch response receives an error about partition leadership, fencing, > etc. a metadata refresh is triggered. However, it takes time for that refresh > to occur, and in the interim, it appears that the consumer will blindly > attempt to fetch data for the partition again, in kind of a "definition of > insanity" type of way. Ideally, the consumer would have a way to temporarily > ignore those partitions, in a way somewhat like the "pausing" approach so > that they are skipped until the metadata refresh response is fully processed. > This affects both the existing KafkaConsumer and the new > PrototypeAsyncConsumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls
[ https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15551: -- Component/s: clients > Evaluate conditions for short circuiting consumer API calls > --- > > Key: KAFKA-15551 > URL: https://issues.apache.org/jira/browse/KAFKA-15551 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > For conditions like: > * Committing empty offset > * Fetching offsets for empty partitions > * Getting empty topic partition position > Should be short circuit possibly at the API level. > As a bonus, we should double-check whether the existing {{KafkaConsumer}} > implementation suffers from this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15553) Review committed offset refresh logic
[ https://issues.apache.org/jira/browse/KAFKA-15553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15553: -- Component/s: clients > Review committed offset refresh logic > - > > Key: KAFKA-15553 > URL: https://issues.apache.org/jira/browse/KAFKA-15553 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > From the exsiting comment: If there are any partitions which do not have a > valid position and are not awaiting reset, then we need to fetch committed > offsets. > > In the async consumer: I wonder if it would make sense to refresh the > position on the event loop continuously. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15250) DefaultBackgroundThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15250: -- Component/s: clients > DefaultBackgroundThread is running tight loop > - > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15173) Consumer event queues should be bounded
[ https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15173: -- Component/s: clients > Consumer event queues should be bounded > --- > > Key: KAFKA-15173 > URL: https://issues.apache.org/jira/browse/KAFKA-15173 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > > The async consumer uses ApplicationEventQueue and BackgroundEventQueue to > facilitate message passing between the application thread and the background > thread. The current implementation is boundless, which can potentially cause > OOM and other performance-related issues. > I think the queues need a finite bound, and we need to decide how to handle > the situation when the bound is reached. In particular, I would like to > answer these questions: > > # What should the upper limit be for both queues: Can this be a > configurable, memory-based bound? Or just an arbitrary number of events as > the bound. > # What should happen when the application event queue is filled up? It > seems like we should introduce a new exception type and notify the user that > the consumer is full. > # What should happen when the background event queue is filled up? This > seems less likely to happen, but I imagine it could happen when the user > stops polling the consumer, causing the queue to be filled. > # Is it necessary to introduce a public configuration for the queue? I think > initially we would select an arbitrary constant number and see the community > feedback to make a forward plan accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15320) Document event queueing patterns
[ https://issues.apache.org/jira/browse/KAFKA-15320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15320: -- Component/s: clients consumer > Document event queueing patterns > > > Key: KAFKA-15320 > URL: https://issues.apache.org/jira/browse/KAFKA-15320 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > > We need to first document the event enqueuing patterns in the > PrototypeAsyncConsumer. As part of this task, determine if it’s > necessary/beneficial to _conditionally_ add events and/or coalesce any > duplicate events in the queue. > _Don’t forget to include diagrams for clarity!_ > This should be documented on the AK wiki. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Labels: consumer-threading-refactor kip-848-client-support kip-848-preview (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval > - > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > As part of the review for [FetchRequestManager pull > request|https://github.com/apache/kafka/pull/14406], [~junrao] had some > questions related to the correctness and clarity of the > {{FetcherTest.testCompletedFetchRemoval()}} test: > Questions: > * https://github.com/apache/kafka/pull/14406#discussion_r1347908197 > * https://github.com/apache/kafka/pull/14406#discussion_r1347910980 > * https://github.com/apache/kafka/pull/14406#discussion_r1347913781 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15634) Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15634: -- Labels: consumer-threading-refactor kip-848-client-support kip-848-preview (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics > > > Key: KAFKA-15634 > URL: https://issues.apache.org/jira/browse/KAFKA-15634 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > What is the point of the code in the initial {{while}} loop since the receive > is delayed and thus there is no {{throttleDelayMs}} received in the client? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId
[ https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15557: -- Labels: consumer-threading-refactor kip-848-client-support kip-848-preview (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in > assignFromUserNoId > --- > > Key: KAFKA-15557 > URL: https://issues.apache.org/jira/browse/KAFKA-15557 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods > named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to > perform duplicate metadata updates: > {code:java} > private void assignFromUser(Set partitions) { > subscriptions.assignFromUser(partitions); > client.updateMetadata(initialUpdateResponse); > // A dummy metadata update to ensure valid leader epoch. > metadata.updateWithCurrentRequestVersion( > RequestTestUtils.metadataUpdateWithIds( > "dummy", > 1, > Collections.emptyMap(), > singletonMap(topicName, 4), > tp -> validLeaderEpoch, topicIds > ), > false, > 0L > ); > } > {code} > {{client.updateMetadata()}} eventually calls > {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is > updating the cluster metadata twice with different values. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15651) Investigate auto commit guarantees during Consumer.assign()
[ https://issues.apache.org/jira/browse/KAFKA-15651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15651: -- Labels: consumer-threading-refactor kip-848-client-support kip-848-preview (was: consumer-threading-refactor) > Investigate auto commit guarantees during Consumer.assign() > --- > > Key: KAFKA-15651 > URL: https://issues.apache.org/jira/browse/KAFKA-15651 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > In the {{assign()}} method implementation, both {{KafkaConsumer}} and > {{PrototypeAsyncConsumer}} commit offsets asynchronously. Is this > intentional? [~junrao] asks in a [recent PR > review|https://github.com/apache/kafka/pull/14406/files/193af8230d0c61853d764cbbe29bca2fc6361af9#r1349023459]: > {quote}Do we guarantee that the new owner of the unsubscribed partitions > could pick up the latest committed offset? > {quote} > Let's confirm whether the asynchronous approach is acceptable and correct. If > it is, great, let's enhance the documentation to briefly explain why. If it > is not, let's correct the behavior if it's within the API semantic > expectations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThread's testPollResultTimer
[ https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15638: -- Labels: consumer-threading-refactor kip-848-client-support kip-848-preview (was: consumer-threading-refactor) > Investigate ConsumerNetworkThread's testPollResultTimer > --- > > Key: KAFKA-15638 > URL: https://issues.apache.org/jira/browse/KAFKA-15638 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > Regarding this comment in {{{}testPollResultTimer{}}}... > {code:java} > // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE > upon success| > {code} > [~junrao] asked: > {quote}Which call is returning Long.MAX_VALUE? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15617) Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions and testInflightFetchOnPendingPartitions overlap
[ https://issues.apache.org/jira/browse/KAFKA-15617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15617: -- Labels: consumer-threading-refactor kip-848-client-support kip-848-preview (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions > and testInflightFetchOnPendingPartitions overlap > -- > > Key: KAFKA-15617 > URL: https://issues.apache.org/jira/browse/KAFKA-15617 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > In FetcherTest, the two tests testFetchingPendingPartitions and > testInflightFetchOnPendingPartitions have significant overlap. Perhaps the > former subsumes the latter? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15615) Improve handling of fetching during metadata updates
[ https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15615: -- Labels: consumer-threading-refactor kip-848-client-support kip-848-preview (was: consumer-threading-refactor) > Improve handling of fetching during metadata updates > > > Key: KAFKA-15615 > URL: https://issues.apache.org/jira/browse/KAFKA-15615 > Project: Kafka > Issue Type: Sub-task >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > [During a review of the new > fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], > [~junrao] found what appears to be an opportunity for optimization. > When a fetch response receives an error about partition leadership, fencing, > etc. a metadata refresh is triggered. However, it takes time for that refresh > to occur, and in the interim, it appears that the consumer will blindly > attempt to fetch data for the partition again, in kind of a "definition of > insanity" type of way. Ideally, the consumer would have a way to temporarily > ignore those partitions, in a way somewhat like the "pausing" approach so > that they are skipped until the metadata refresh response is fully processed. > This affects both the existing KafkaConsumer and the new > PrototypeAsyncConsumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15636: -- Labels: consumer-threading-refactor kip-848-client-support kip-848-preview (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics > > > Key: KAFKA-15636 > URL: https://issues.apache.org/jira/browse/KAFKA-15636 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > {{expectedBytes}} is calculated as total, instead of avg. Is this correct? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()
[ https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15558: -- Labels: consumer-threading-refactor kip-848-client-support kip-848-preview (was: consumer-threading-refactor) > Determine if Timer should be used elsewhere in > PrototypeAsyncConsumer.updateFetchPositions() > > > Key: KAFKA-15558 > URL: https://issues.apache.org/jira/browse/KAFKA-15558 > Project: Kafka > Issue Type: Sub-task >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > This is a followup ticket based on a question from [~junrao] when reviewing > the [fetch request manager pull > request|https://github.com/apache/kafka/pull/14406]: > {quote}It still seems weird that we only use the timer for > {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we > don't have valid fetch positions. For example, if all partitions are in > {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} > will just go in a busy loop, which is not efficient. > {quote} > The goal here is to determine if we should also be propagating the Timer to > the validate positions and reset positions operations. > Note: we should also investigate if the existing {{KafkaConsumer}} > implementation should be fixed, too. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded
[ https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15637: -- Labels: consumer-threading-refactor kip-848-client-support kip-848-preview (was: consumer-threading-refactor) > Investigate FetcherTest's/FetchRequestManager's > testFetchCompletedBeforeHandlerAdded > > > Key: KAFKA-15637 > URL: https://issues.apache.org/jira/browse/KAFKA-15637 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > Thanks for the reply. I still don't quite understand the test. Why do we > duplicate the following code both inside and outside of {{{}setWakeupHook{}}}? > > {code:java} > networkClientDelegate.disconnectAsync(readReplica); > networkClientDelegate.poll(time.timer(0)); > {code} > > MockClient is only woken up through > {{{}networkClientDelegate.disconnectAsync{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)