[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset
[ https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YaYun Wang updated KAFKA-15074: --- Description: I got ?? "Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset " ??when i consumer kafka through @KafkaListener in that case producer publish 100W data to partition 4 in several minutes, and one consumer consume the data from the partition. was: I got ?? "Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset " ??when i consumer kafka through @KafkaListener in that case producer publish 100W data to partition 4 in one minute, and one consumer consume the data from the partition. > offset out of range for partition xxx, resetting offset > --- > > Key: KAFKA-15074 > URL: https://issues.apache.org/jira/browse/KAFKA-15074 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.3.2 >Reporter: YaYun Wang >Priority: Major > > I got ?? "Fetch position FetchPosition{offset=42574305, > offsetEpoch=Optional[2214], > currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: > cn-north-1d)], epoch=2214}} is out of range for partition > vcc.hdmap.tile.delivery-4, resetting offset " ??when i consumer kafka through > @KafkaListener in that case producer publish 100W data to partition 4 in > several minutes, and one consumer consume the data from the partition. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
dajac commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1222426539 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -980,6 +1006,25 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state. + // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on + // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append. + // There are two phases -- the first append to the log and subsequent appends. + // + // 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and + // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction + // to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could + // have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker, + // start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not + // result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2. + // + // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the + // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still + // ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and + // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1. + if (!hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard)) Review Comment: Is it safe to call hasOngoingTransaction when the batch is not transactional? It may be better to keep checking batch.isTransactional first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset
[ https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YaYun Wang updated KAFKA-15074: --- Description: I got ?? "Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset " ??when i consumer kafka through @KafkaListener in that case producer publish 100W data to partition 4 in one minute, and one consumer consume the data from the partition. was: I ??got Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset ?? when i consumer kafka through @KafkaListener in that case producer publish 100W data to partition 4 in one minute, and one consumer consume the data from the partition. > offset out of range for partition xxx, resetting offset > --- > > Key: KAFKA-15074 > URL: https://issues.apache.org/jira/browse/KAFKA-15074 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.3.2 >Reporter: YaYun Wang >Priority: Major > > I got ?? "Fetch position FetchPosition{offset=42574305, > offsetEpoch=Optional[2214], > currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: > cn-north-1d)], epoch=2214}} is out of range for partition > vcc.hdmap.tile.delivery-4, resetting offset " ??when i consumer kafka through > @KafkaListener in that case producer publish 100W data to partition 4 in one > minute, and one consumer consume the data from the partition. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15074) offset out of range for partition xxx, resetting offset
[ https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730365#comment-17730365 ] YaYun Wang edited comment on KAFKA-15074 at 6/8/23 3:30 AM: *And here is the debug logs of kafka-client:* 2023-06-06 09:12:59.576+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [handle] [Consumer clientId=consumer-group-1, groupId=group] Committed offset 42574009 for partition myTopic-4 2023-06-06 09:12:59.577+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-1 at position FetchPosition{offset=2776251, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 2 rack: cn-north-1d)], epoch=}} to node host:9093 (id: 2 rack: cn-north-1d) 2023-06-06 09:12:59.577+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-7 at position FetchPosition{offset=1678666, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 2 rack: cn-north-1d)], epoch=}} to node host:9093 (id: 2 rack: cn-north-1d) 2023-06-06 09:12:59.577+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [sendFetches] [Consumer clientId=consumer-group-1, groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), implied=(myTopic-1, myTopic-7), canUseTopicIds=True) to broker host:9093 (id: 2 rack: cn-north-1d) 2023-06-06 09:12:59.963+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [handle] [Consumer clientId=consumer-group-1, groupId=group] Committed offset 42574109 for partition myTopic-4 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-8 at position FetchPosition{offset=2559637, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 0 rack: cn-north-1a)], epoch=2001}} to node host:9093 (id: 0 rack: cn-north-1a) 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-0 at position FetchPosition{offset=1678355, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 1 rack: cn-north-1b)], epoch=1996}} to node host:9093 (id: 1 rack: cn-north-1b) 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-3 at position FetchPosition{offset=2773458, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 1 rack: cn-north-1b)], epoch=2171}} to node host:9093 (id: 1 rack: cn-north-1b) 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-2 at position FetchPosition{offset=1677672, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 0 rack: cn-north-1a)], epoch=2001}} to node host:9093 (id: 0 rack: cn-north-1a) 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-5 at position FetchPosition{offset=1678870, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 0 rack: cn-north-1a)], epoch=1999}} to node host:9093 (id: 0 rack: cn-north-1a) 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-6 at position FetchPosition{offset=1994802, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 1 rack: cn-north-1b)], epoch=1996}} to node host:9093 (id: 1 rack: cn-north-1b) 2023-06-06 09:12:59.964+
[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset
[ https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YaYun Wang updated KAFKA-15074: --- Description: I ??got Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset ?? when i consumer kafka through @KafkaListener in that case producer publish 100W data to partition 4 in one minute, and one consumer consume the data from the partition. was: I ??got Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset ?? when i consumer kafka through @KafkaListener. > offset out of range for partition xxx, resetting offset > --- > > Key: KAFKA-15074 > URL: https://issues.apache.org/jira/browse/KAFKA-15074 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.3.2 >Reporter: YaYun Wang >Priority: Major > > I ??got Fetch position FetchPosition{offset=42574305, > offsetEpoch=Optional[2214], > currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: > cn-north-1d)], epoch=2214}} is out of range for partition > vcc.hdmap.tile.delivery-4, resetting offset ?? > when i consumer kafka through @KafkaListener in that case producer publish > 100W data to partition 4 in one minute, and one consumer consume the data > from the partition. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15074) offset out of range for partition xxx, resetting offset
[ https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730365#comment-17730365 ] YaYun Wang commented on KAFKA-15074: And here is my debug logs of kafka-client: 2023-06-06 09:12:59.576+ [istenerEndpointContainer#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [handle] [Consumer clientId=consumer-group-1, groupId=group] Committed offset 42574009 for partition myTopic-4 2023-06-06 09:12:59.577+ [istenerEndpointContainer#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-1 at position FetchPosition\{offset=2776251, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 2 rack: cn-north-1d)], epoch=}} to node host:9093 (id: 2 rack: cn-north-1d) 2023-06-06 09:12:59.577+ [istenerEndpointContainer#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-7 at position FetchPosition\{offset=1678666, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 2 rack: cn-north-1d)], epoch=}} to node host:9093 (id: 2 rack: cn-north-1d) 2023-06-06 09:12:59.577+ [istenerEndpointContainer#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [sendFetches] [Consumer clientId=consumer-group-1, groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), implied=(myTopic-1, myTopic-7), canUseTopicIds=True) to broker host:9093 (id: 2 rack: cn-north-1d) 2023-06-06 09:12:59.963+ [istenerEndpointContainer#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [handle] [Consumer clientId=consumer-group-1, groupId=group] Committed offset 42574109 for partition myTopic-4 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-8 at position FetchPosition\{offset=2559637, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 0 rack: cn-north-1a)], epoch=2001}} to node host:9093 (id: 0 rack: cn-north-1a) 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-0 at position FetchPosition\{offset=1678355, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 1 rack: cn-north-1b)], epoch=1996}} to node host:9093 (id: 1 rack: cn-north-1b) 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-3 at position FetchPosition\{offset=2773458, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 1 rack: cn-north-1b)], epoch=2171}} to node host:9093 (id: 1 rack: cn-north-1b) 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-2 at position FetchPosition\{offset=1677672, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 0 rack: cn-north-1a)], epoch=2001}} to node host:9093 (id: 0 rack: cn-north-1a) 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-5 at position FetchPosition\{offset=1678870, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 0 rack: cn-north-1a)], epoch=1999}} to node host:9093 (id: 0 rack: cn-north-1a) 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request for partition myTopic-6 at position FetchPosition\{offset=1994802, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 1 rack: cn-north-1b)], epoch=1996}} to node host:9093 (id: 1 rack: cn-north-1b) 2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [sendFetches] [Consumer clientId=consumer-group-1, g
[jira] [Commented] (KAFKA-15074) offset out of range for partition xxx, resetting offset
[ https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730360#comment-17730360 ] YaYun Wang commented on KAFKA-15074: *Here is my code:* @KafkaListener( topics = "myTopic", containerFactory = "myContainerFactory") public void consume(ConsumerRecord consumerRecord) throws ExecutionException, InterruptedException { String recordValue = consumerRecord.value(); // do something with recordValue } > offset out of range for partition xxx, resetting offset > --- > > Key: KAFKA-15074 > URL: https://issues.apache.org/jira/browse/KAFKA-15074 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.3.2 >Reporter: YaYun Wang >Priority: Major > > I ??got Fetch position FetchPosition{offset=42574305, > offsetEpoch=Optional[2214], > currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: > cn-north-1d)], epoch=2214}} is out of range for partition > vcc.hdmap.tile.delivery-4, resetting offset ?? > when i consumer kafka through @KafkaListener. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset
[ https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YaYun Wang updated KAFKA-15074: --- Description: I ??got Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset ?? when i consumer kafka through @KafkaListener was: I ??got Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch\{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset ?? when i consumer kafka through @KafkaListener: > offset out of range for partition xxx, resetting offset > --- > > Key: KAFKA-15074 > URL: https://issues.apache.org/jira/browse/KAFKA-15074 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.3.2 >Reporter: YaYun Wang >Priority: Major > > I ??got Fetch position FetchPosition{offset=42574305, > offsetEpoch=Optional[2214], > currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: > cn-north-1d)], epoch=2214}} is out of range for partition > vcc.hdmap.tile.delivery-4, resetting offset ?? > when i consumer kafka through @KafkaListener > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset
[ https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YaYun Wang updated KAFKA-15074: --- Description: I ??got Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset ?? when i consumer kafka through @KafkaListener. was: I ??got Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset ?? when i consumer kafka through @KafkaListener > offset out of range for partition xxx, resetting offset > --- > > Key: KAFKA-15074 > URL: https://issues.apache.org/jira/browse/KAFKA-15074 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.3.2 >Reporter: YaYun Wang >Priority: Major > > I ??got Fetch position FetchPosition{offset=42574305, > offsetEpoch=Optional[2214], > currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: > cn-north-1d)], epoch=2214}} is out of range for partition > vcc.hdmap.tile.delivery-4, resetting offset ?? > when i consumer kafka through @KafkaListener. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15074) offset out of range for partition xxx, resetting offset
YaYun Wang created KAFKA-15074: -- Summary: offset out of range for partition xxx, resetting offset Key: KAFKA-15074 URL: https://issues.apache.org/jira/browse/KAFKA-15074 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 3.3.2 Reporter: YaYun Wang I ??got Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch\{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset ?? when i consumer kafka through @KafkaListener: -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1222332377 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java: ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.test.MockInternalNewProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class RocksDBTimeOrderedKeyValueBufferTest { +public RocksDBTimeOrderedKeyValueBuffer buffer; +@Mock +public SerdeGetter serdeGetter; +public InternalProcessorContext context; +public StreamsMetricsImpl streamsMetrics; +@Mock +public Sensor sensor; +public long offset; + +@Before +public void setUp() { +when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde()); +when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde()); +final Metrics metrics = new Metrics(); +offset = 0; +streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); +context = new MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory()); +} + +public void createJoin(final Duration grace) { +final RocksDBTimeOrderedKeyValueSegmentedBytesStore store = new RocksDbTimeOrderedKeyValueBytesStoreSupplier("testing", 100).get(); Review Comment: For testing this was mostly just me being and picking a number I was sure would work. putting it as the grace period or higher would be the correct thing not in a test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #13827: KAFKA-15073: Add a Github action to mark PRs as stale
mumrah opened a new pull request, #13827: URL: https://github.com/apache/kafka/pull/13827 This patch uses a Github workflow to mark inactive PRs with the `stale` label. The workflow uses the ["stale" action](https://github.com/actions/stale) which has many features including a number of filtering options and the ability to auto-closing issues. For this patch, I've disable auto-closing PRs and just have it add the label for PRs with more than 90 days of inactivity. I've also enabled the "dry-run" option while we discuss the configuration. The workflow will run nightly at 3:30 UTC. It can also be triggered manually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15073) Automation for old/inactive PRs
David Arthur created KAFKA-15073: Summary: Automation for old/inactive PRs Key: KAFKA-15073 URL: https://issues.apache.org/jira/browse/KAFKA-15073 Project: Kafka Issue Type: Improvement Components: build Reporter: David Arthur Following from a discussion on the mailing list. It would be nice to automatically triage inactive PRs. There are currently over 1000 open PRs. Most likely a majority of these will not ever be merged and should be closed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService
jolshan commented on code in PR #13812: URL: https://github.com/apache/kafka/pull/13812#discussion_r1222312423 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java: ## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.runtime.Coordinator; +import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; + +/** + * The group coordinator replicated state machine that manages the metadata of all generic and + * consumer groups. It holds the hard and the soft state of the groups. This class has two kinds + * of methods: + * 1) The request handlers which handle the requests and generate a response and records to + *mutate the hard state. Those records will be written by the runtime and applied to the + *hard state via the replay methods. + * 2) The replay methods which apply records to the hard state. Those are used in the request + *handling as well as during the initial loading of the records from the partitions. + */ +public class ReplicatedGroupCoordinator implements Coordinator { Review Comment: Is this called replicated because we replicate the state? (In other words, this is the implementation to get the hard state we already have for the current group coordinator) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…
jolshan commented on PR #13796: URL: https://github.com/apache/kafka/pull/13796#issuecomment-1581669973 > No, we can't - it all depends on the retries and the delivery timeout. Do you think that causes a problem? I guess I just need to clarify what retried batches are here -- is the idea that we wait for inflight batches to return a response or time out? What if the response triggers another retry? Would we prevent that from sending out? I'm also wondering the benefit of preserving the previous batches if there is an error. How does the system recovery differently if we allow those batches to "complete". I think we could run into cases where the error causes the inflight batches to be unable to be written. Do we prefer to fail them (what we may do now) and start clean or try to write them with new sequences? I can see both scenarios causing issues. I guess it boils down to availability of writes (rewriting the sequences allows us to continue writing) or idempotency correctness (trying to wait for them to complete with their old sequences). The sticking point I'm running into is why getting those extra inflight requests (potentially) written is better if we've hit a non-retriable error. Maybe I just need an example :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581658284 > I'm not sure I follow. Are you saying that the test would set the thread local before running? So a single thread may set itself back and forth within the suite? (If you have a commit with the change, that may also help me understand) I have updated the PR with the `ThreadLocal`-based approach. This approach results in much fewer changes to the surrounding classes. Please take a look when you have time. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581633134 > Are you saying that the test would set the thread local before running? So a single thread may set itself back and forth within the suite? Precisely. That's in effect what it does now in the tests: either `SENDER` or `APPLICATION` is passed in to the various `TransactionManager` calls. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pjpringle commented on pull request #12540: bug: State stores lose state when tasks are reassigned under EOS wit…
pjpringle commented on PR #12540: URL: https://github.com/apache/kafka/pull/12540#issuecomment-1581567216 Is this bug eos specific given the title? Looking at the fix I dont believe it is. I have been seeing similar stale state issues on rebalance in a non eos setup but with stand by replicas configured -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581564571 > No, the test method would need to specifically mark its thread as wanting to poison the thread or not before running the rest of the test. In the TransactionManagerTest, there was only one place I had to specifically call it to get the tests to pass. I'm not sure I follow. Are you saying that the test would set the thread local before running? So a single thread may set itself back and forth within the suite? (If you have a commit with the change, that may also help me understand) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
vcrfxia commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r107444 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java: ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.test.MockInternalNewProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class RocksDBTimeOrderedKeyValueBufferTest { +public RocksDBTimeOrderedKeyValueBuffer buffer; +@Mock +public SerdeGetter serdeGetter; +public InternalProcessorContext context; +public StreamsMetricsImpl streamsMetrics; +@Mock +public Sensor sensor; +public long offset; + +@Before +public void setUp() { +when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde()); +when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde()); +final Metrics metrics = new Metrics(); +offset = 0; +streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); +context = new MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory()); +} + +public void createJoin(final Duration grace) { +final RocksDBTimeOrderedKeyValueSegmentedBytesStore store = new RocksDbTimeOrderedKeyValueBytesStoreSupplier("testing", 100).get(); Review Comment: What's the relationship between the segments store retention period (100, currently) and the buffer's grace period (`grace`)? Any reason they shouldn't be the same? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15060) Fix the ApiVersionManager interface
[ https://issues.apache.org/jira/browse/KAFKA-15060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15060: - Description: (was: Fix Admin.describeFeatures, which was accidentally broken by KAFKA-15007.) > Fix the ApiVersionManager interface > --- > > Key: KAFKA-15060 > URL: https://issues.apache.org/jira/browse/KAFKA-15060 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe opened a new pull request, #13826: KAFKA-15060: fix the ApiVersionManager interface
cmccabe opened a new pull request, #13826: URL: https://github.com/apache/kafka/pull/13826 This PR expands the scope of ApiVersionManager a bit to include returning the current MetadataVersion and features that are in effect. This is useful in general because that information needs to be returned in an ApiVersionsResponse. It also allows us to fix the ApiVersionManager interface so that all subclasses implement all methods of the interface. Having subclasses that don't implement some methods is dangerous because they could cause exceptions at runtime in unexpected scenarios. On the KRaft controller, we were previously performing a read operation in the QuorumController thread to get the current metadata version and features. With this PR, we now read a volatile variable maintained by a separate MetadataVersionContextPublisher object. This will improve performance and simplify the code. It should not change the guarantees we are providing; in both the old and new scenarios, we need to be robust against version skew scenarios during updates. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15060) Fix the ApiVersionManager interface
[ https://issues.apache.org/jira/browse/KAFKA-15060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15060: - Summary: Fix the ApiVersionManager interface (was: Fix Admin.describeFeatures) > Fix the ApiVersionManager interface > --- > > Key: KAFKA-15060 > URL: https://issues.apache.org/jira/browse/KAFKA-15060 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Priority: Major > > Fix Admin.describeFeatures, which was accidentally broken by KAFKA-15007. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r102869 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashSet; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CoordinatorRuntimeTest { +private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0); + +/** + * An CoordinatorEventProcessor that directly executes the operations. This is + * useful in unit tests where execution in threads is not required. + */ +private static class MockEventProcessor implements CoordinatorEventProcessor { +@Override +public void enqueue(CoordinatorEvent event) throws RejectedExecutionException { +try { +event.run(); +} catch (Throwable ex) { +event.complete(ex); +} +} + +@Override +public void close() throws Exception {} +} + +/** + * A CoordinatorLoader that always succeeds. + */ +private static class MockCoordinatorLoader implements CoordinatorLoader { +@Override +public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { +return CompletableFuture.completedFuture(null); +} +} + +/** + * An in-memory partition writer that accepts a maximum number of writes. + */ +private static class MockPartitionWriter extends InMemoryPartitionWriter { +private int allowedWrites = 1; + +public MockPartitionWriter() { +this(Integer.MAX_VALUE); +} + +public MockPartitionWriter(int allowedWrites) { +super(false); +this.allowedWrites = allowedWrites; +} + +@Override +public void registerListener(TopicPartition tp, Listener listener) { +super.registerListener(tp, listener); +} + +@Override +public void deregisterListener(TopicPartition tp, Listener listener) { +super.deregisterListener(tp, listener); +} + +@Override +public long append(TopicPartition tp, List records) throws KafkaException { +if (allowedWrites-- > 0) { +return super.append(tp, records); +} else { +throw new KafkaException("append failed."); +} +} +} + +/** + * A simple Coordinator implementation that stores the records into a set. + */ +private static class MockCoordinator implements Coordinator { +private final TimelineHashSet records; + +MockCoordinator( +SnapshotRegistry snapshotRegistry +) { +records = new TimelineHashSet<>(snapshotRegistry,
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1222198170 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashSet; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CoordinatorRuntimeTest { +private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0); + +/** + * An CoordinatorEventProcessor that directly executes the operations. This is + * useful in unit tests where execution in threads is not required. + */ +private static class MockEventProcessor implements CoordinatorEventProcessor { +@Override +public void enqueue(CoordinatorEvent event) throws RejectedExecutionException { +try { +event.run(); +} catch (Throwable ex) { +event.complete(ex); +} +} + +@Override +public void close() throws Exception {} +} + +/** + * A CoordinatorLoader that always succeeds. + */ +private static class MockCoordinatorLoader implements CoordinatorLoader { +@Override +public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { +return CompletableFuture.completedFuture(null); +} +} + +/** + * An in-memory partition writer that accepts a maximum number of writes. + */ +private static class MockPartitionWriter extends InMemoryPartitionWriter { +private int allowedWrites = 1; + +public MockPartitionWriter() { +this(Integer.MAX_VALUE); +} + +public MockPartitionWriter(int allowedWrites) { +super(false); +this.allowedWrites = allowedWrites; +} + +@Override +public void registerListener(TopicPartition tp, Listener listener) { +super.registerListener(tp, listener); +} + +@Override +public void deregisterListener(TopicPartition tp, Listener listener) { +super.deregisterListener(tp, listener); +} + +@Override +public long append(TopicPartition tp, List records) throws KafkaException { +if (allowedWrites-- > 0) { +return super.append(tp, records); +} else { +throw new KafkaException("append failed."); +} +} +} + +/** + * A simple Coordinator implementation that stores the records into a set. + */ +private static class MockCoordinator implements Coordinator { +private final TimelineHashSet records; + +MockCoordinator( +SnapshotRegistry snapshotRegistry +) { +records = new TimelineHashSet<>(snapshotRegistry,
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1222177376 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashSet; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CoordinatorRuntimeTest { +private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0); + +/** + * An CoordinatorEventProcessor that directly executes the operations. This is + * useful in unit tests where execution in threads is not required. + */ +private static class MockEventProcessor implements CoordinatorEventProcessor { +@Override +public void enqueue(CoordinatorEvent event) throws RejectedExecutionException { +try { +event.run(); +} catch (Throwable ex) { +event.complete(ex); +} +} + +@Override +public void close() throws Exception {} +} + +/** + * A CoordinatorLoader that always succeeds. + */ +private static class MockCoordinatorLoader implements CoordinatorLoader { +@Override +public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { +return CompletableFuture.completedFuture(null); +} +} + +/** + * An in-memory partition writer that accepts a maximum number of writes. + */ +private static class MockPartitionWriter extends InMemoryPartitionWriter { +private int allowedWrites = 1; + +public MockPartitionWriter() { +this(Integer.MAX_VALUE); +} + +public MockPartitionWriter(int allowedWrites) { +super(false); +this.allowedWrites = allowedWrites; +} + +@Override +public void registerListener(TopicPartition tp, Listener listener) { +super.registerListener(tp, listener); +} + +@Override +public void deregisterListener(TopicPartition tp, Listener listener) { +super.deregisterListener(tp, listener); +} + +@Override +public long append(TopicPartition tp, List records) throws KafkaException { +if (allowedWrites-- > 0) { +return super.append(tp, records); +} else { +throw new KafkaException("append failed."); +} +} +} + +/** + * A simple Coordinator implementation that stores the records into a set. + */ +private static class MockCoordinator implements Coordinator { +private final TimelineHashSet records; + +MockCoordinator( +SnapshotRegistry snapshotRegistry +) { +records = new TimelineHashSet<>(snapshotRegistry,
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1222175645 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashSet; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CoordinatorRuntimeTest { +private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0); + +/** + * An CoordinatorEventProcessor that directly executes the operations. This is + * useful in unit tests where execution in threads is not required. + */ +private static class MockEventProcessor implements CoordinatorEventProcessor { +@Override +public void enqueue(CoordinatorEvent event) throws RejectedExecutionException { +try { +event.run(); +} catch (Throwable ex) { +event.complete(ex); +} +} + +@Override +public void close() throws Exception {} +} + +/** + * A CoordinatorLoader that always succeeds. + */ +private static class MockCoordinatorLoader implements CoordinatorLoader { +@Override +public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { +return CompletableFuture.completedFuture(null); +} +} + +/** + * An in-memory partition writer that accepts a maximum number of writes. + */ +private static class MockPartitionWriter extends InMemoryPartitionWriter { +private int allowedWrites = 1; + +public MockPartitionWriter() { +this(Integer.MAX_VALUE); +} + +public MockPartitionWriter(int allowedWrites) { +super(false); +this.allowedWrites = allowedWrites; +} + +@Override +public void registerListener(TopicPartition tp, Listener listener) { +super.registerListener(tp, listener); +} + +@Override +public void deregisterListener(TopicPartition tp, Listener listener) { +super.deregisterListener(tp, listener); +} + +@Override +public long append(TopicPartition tp, List records) throws KafkaException { +if (allowedWrites-- > 0) { +return super.append(tp, records); +} else { +throw new KafkaException("append failed."); +} +} +} + +/** + * A simple Coordinator implementation that stores the records into a set. + */ +private static class MockCoordinator implements Coordinator { +private final TimelineHashSet records; + +MockCoordinator( +SnapshotRegistry snapshotRegistry +) { +records = new TimelineHashSet<>(snapshotRegistry,
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1222169738 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashSet; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CoordinatorRuntimeTest { +private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0); + +/** + * An CoordinatorEventProcessor that directly executes the operations. This is + * useful in unit tests where execution in threads is not required. + */ +private static class MockEventProcessor implements CoordinatorEventProcessor { +@Override +public void enqueue(CoordinatorEvent event) throws RejectedExecutionException { +try { +event.run(); +} catch (Throwable ex) { +event.complete(ex); +} +} + +@Override +public void close() throws Exception {} +} + +/** + * A CoordinatorLoader that always succeeds. + */ +private static class MockCoordinatorLoader implements CoordinatorLoader { +@Override +public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { +return CompletableFuture.completedFuture(null); +} +} + +/** + * An in-memory partition writer that accepts a maximum number of writes. + */ +private static class MockPartitionWriter extends InMemoryPartitionWriter { +private int allowedWrites = 1; + +public MockPartitionWriter() { +this(Integer.MAX_VALUE); +} + +public MockPartitionWriter(int allowedWrites) { +super(false); +this.allowedWrites = allowedWrites; +} + +@Override +public void registerListener(TopicPartition tp, Listener listener) { +super.registerListener(tp, listener); +} + +@Override +public void deregisterListener(TopicPartition tp, Listener listener) { +super.deregisterListener(tp, listener); +} + +@Override +public long append(TopicPartition tp, List records) throws KafkaException { +if (allowedWrites-- > 0) { +return super.append(tp, records); +} else { +throw new KafkaException("append failed."); +} +} +} + +/** + * A simple Coordinator implementation that stores the records into a set. + */ +private static class MockCoordinator implements Coordinator { +private final TimelineHashSet records; + +MockCoordinator( +SnapshotRegistry snapshotRegistry +) { +records = new TimelineHashSet<>(snapshotRegistry,
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1222158858 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashSet; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CoordinatorRuntimeTest { +private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0); + +/** + * An CoordinatorEventProcessor that directly executes the operations. This is + * useful in unit tests where execution in threads is not required. + */ +private static class MockEventProcessor implements CoordinatorEventProcessor { +@Override +public void enqueue(CoordinatorEvent event) throws RejectedExecutionException { +try { +event.run(); +} catch (Throwable ex) { +event.complete(ex); +} +} + +@Override +public void close() throws Exception {} +} + +/** + * A CoordinatorLoader that always succeeds. + */ +private static class MockCoordinatorLoader implements CoordinatorLoader { +@Override +public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { +return CompletableFuture.completedFuture(null); +} +} + +/** + * An in-memory partition writer that accepts a maximum number of writes. + */ +private static class MockPartitionWriter extends InMemoryPartitionWriter { +private int allowedWrites = 1; + +public MockPartitionWriter() { +this(Integer.MAX_VALUE); +} + +public MockPartitionWriter(int allowedWrites) { +super(false); +this.allowedWrites = allowedWrites; +} + +@Override +public void registerListener(TopicPartition tp, Listener listener) { +super.registerListener(tp, listener); +} + +@Override +public void deregisterListener(TopicPartition tp, Listener listener) { +super.deregisterListener(tp, listener); +} + +@Override +public long append(TopicPartition tp, List records) throws KafkaException { +if (allowedWrites-- > 0) { +return super.append(tp, records); +} else { +throw new KafkaException("append failed."); +} +} +} + +/** + * A simple Coordinator implementation that stores the records into a set. + */ +private static class MockCoordinator implements Coordinator { +private final TimelineHashSet records; + +MockCoordinator( +SnapshotRegistry snapshotRegistry +) { +records = new TimelineHashSet<>(snapshotRegistry,
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1222148781 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashSet; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CoordinatorRuntimeTest { +private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0); + +/** + * An CoordinatorEventProcessor that directly executes the operations. This is + * useful in unit tests where execution in threads is not required. + */ +private static class MockEventProcessor implements CoordinatorEventProcessor { +@Override +public void enqueue(CoordinatorEvent event) throws RejectedExecutionException { +try { +event.run(); +} catch (Throwable ex) { +event.complete(ex); +} +} + +@Override +public void close() throws Exception {} +} + +/** + * A CoordinatorLoader that always succeeds. + */ +private static class MockCoordinatorLoader implements CoordinatorLoader { +@Override +public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { +return CompletableFuture.completedFuture(null); +} +} + +/** + * An in-memory partition writer that accepts a maximum number of writes. + */ +private static class MockPartitionWriter extends InMemoryPartitionWriter { +private int allowedWrites = 1; + +public MockPartitionWriter() { +this(Integer.MAX_VALUE); +} + +public MockPartitionWriter(int allowedWrites) { +super(false); +this.allowedWrites = allowedWrites; +} + +@Override +public void registerListener(TopicPartition tp, Listener listener) { +super.registerListener(tp, listener); +} + +@Override +public void deregisterListener(TopicPartition tp, Listener listener) { +super.deregisterListener(tp, listener); +} + +@Override +public long append(TopicPartition tp, List records) throws KafkaException { +if (allowedWrites-- > 0) { +return super.append(tp, records); +} else { +throw new KafkaException("append failed."); +} +} +} + +/** + * A simple Coordinator implementation that stores the records into a set. + */ +private static class MockCoordinator implements Coordinator { +private final TimelineHashSet records; + +MockCoordinator( +SnapshotRegistry snapshotRegistry +) { +records = new TimelineHashSet<>(snapshotRegistry,
[GitHub] [kafka] junrao commented on a diff in pull request #13815: KAFKA-14966: Extract OffsetFetcher reusable logic
junrao commented on code in PR #13815: URL: https://github.com/apache/kafka/pull/13815#discussion_r1222142092 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.ListOffsetsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Utility functions for fetching offsets, validating and resetting positions. + */ +class OffsetFetcherUtils { +private final ConsumerMetadata metadata; +private final SubscriptionState subscriptionState; +private final Time time; +private final ApiVersions apiVersions; +private final Logger log; + +private final AtomicReference cachedOffsetForLeaderException = new AtomicReference<>(); +private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1); + +OffsetFetcherUtils(LogContext logContext, + ConsumerMetadata metadata, + SubscriptionState subscriptionState, + Time time, + ApiVersions apiVersions) { +this.log = logContext.logger(getClass()); +this.metadata = metadata; +this.subscriptionState = subscriptionState; +this.time = time; +this.apiVersions = apiVersions; +} + +/** + * Callback for the response of the list offset call. + * + * @param listOffsetsResponse The response from the server. + * @return {@link OffsetFetcherUtils.ListOffsetResult} extracted from the response, containing the fetched offsets + * and partitions to retry. + */ +OffsetFetcherUtils.ListOffsetResult handleListOffsetResponse(ListOffsetsResponse listOffsetsResponse) { +Map fetchedOffsets = new HashMap<>(); +Set partitionsToRetry = new HashSet<>(); +Set unauthorizedTopics = new HashSet<>(); + +for (ListOffsetsResponseData.ListOffsetsTopicResponse topic : listOffsetsResponse.topics()) { +for (ListOffsetsResponseData.ListOffsetsPartitionResponse partition : topic.partitions()) { +TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex()); +Errors error = Errors.forCode(partition.errorCode()); +switch (error) { +case NONE: +if (!partition.oldStyleOffsets().isEmpty()) { +// Handle v0 response with offsets +long offset; +if (partition.oldStyleOffsets().size() > 1) { +throw new IllegalStateException("Unexpected partitionData response of length " + +partition.oldStyleOffsets().size()); +} else { +offset = partition.oldStyleOffsets().get(0); +} +log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}", +topicPartition, offset); +if (offset != ListOffsetsResponse
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1222140560 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashSet; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CoordinatorRuntimeTest { +private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0); + +/** + * An CoordinatorEventProcessor that directly executes the operations. This is + * useful in unit tests where execution in threads is not required. + */ +private static class MockEventProcessor implements CoordinatorEventProcessor { +@Override +public void enqueue(CoordinatorEvent event) throws RejectedExecutionException { +try { +event.run(); +} catch (Throwable ex) { +event.complete(ex); +} +} + +@Override +public void close() throws Exception {} +} + +/** + * A CoordinatorLoader that always succeeds. + */ +private static class MockCoordinatorLoader implements CoordinatorLoader { +@Override +public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { +return CompletableFuture.completedFuture(null); +} +} + +/** + * An in-memory partition writer that accepts a maximum number of writes. + */ +private static class MockPartitionWriter extends InMemoryPartitionWriter { +private int allowedWrites = 1; + +public MockPartitionWriter() { +this(Integer.MAX_VALUE); +} + +public MockPartitionWriter(int allowedWrites) { +super(false); +this.allowedWrites = allowedWrites; +} + +@Override +public void registerListener(TopicPartition tp, Listener listener) { +super.registerListener(tp, listener); +} + +@Override +public void deregisterListener(TopicPartition tp, Listener listener) { +super.deregisterListener(tp, listener); +} + +@Override +public long append(TopicPartition tp, List records) throws KafkaException { +if (allowedWrites-- > 0) { +return super.append(tp, records); +} else { +throw new KafkaException("append failed."); +} +} +} + +/** + * A simple Coordinator implementation that stores the records into a set. + */ +private static class MockCoordinator implements Coordinator { +private final TimelineHashSet records; + +MockCoordinator( +SnapshotRegistry snapshotRegistry +) { +records = new TimelineHashSet<>(snapshotRegistry,
[GitHub] [kafka] jlprat commented on pull request #13824: MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde
jlprat commented on PR #13824: URL: https://github.com/apache/kafka/pull/13824#issuecomment-1581463300 Tests failures seem all unrelated and I created the corresponding Jira tickets for the ones I couldn't fine an already existing one: https://issues.apache.org/jira/browse/KAFKA-15070, https://issues.apache.org/jira/browse/KAFKA-15071 and https://issues.apache.org/jira/browse/KAFKA-15072 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15072) Flaky test MirrorConnectorsIntegrationExactlyOnceTest.testReplicationWithEmptyPartition
Josep Prat created KAFKA-15072: -- Summary: Flaky test MirrorConnectorsIntegrationExactlyOnceTest.testReplicationWithEmptyPartition Key: KAFKA-15072 URL: https://issues.apache.org/jira/browse/KAFKA-15072 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.5.0 Reporter: Josep Prat Test MirrorConnectorsIntegrationExactlyOnceTest.testReplicationWithEmptyPartition became flaky again, but it's a different error this time. Occurrence: [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testReplicationWithEmptyPartition__/] h3. Error Message {code:java} java.lang.AssertionError: Connector MirrorHeartbeatConnector tasks did not start in time on cluster: backup-connect-cluster{code} h3. Stacktrace {code:java} java.lang.AssertionError: Connector MirrorHeartbeatConnector tasks did not start in time on cluster: backup-connect-cluster at org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:301) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:912) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicationWithEmptyPartition(MirrorConnectorsIntegrationBaseTest.java:415) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeT
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1222117410 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashSet; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CoordinatorRuntimeTest { +private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0); + +/** + * An CoordinatorEventProcessor that directly executes the operations. This is + * useful in unit tests where execution in threads is not required. + */ +private static class MockEventProcessor implements CoordinatorEventProcessor { +@Override +public void enqueue(CoordinatorEvent event) throws RejectedExecutionException { +try { +event.run(); +} catch (Throwable ex) { +event.complete(ex); +} +} + +@Override +public void close() throws Exception {} +} + +/** + * A CoordinatorLoader that always succeeds. + */ +private static class MockCoordinatorLoader implements CoordinatorLoader { +@Override +public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { +return CompletableFuture.completedFuture(null); +} +} + +/** + * An in-memory partition writer that accepts a maximum number of writes. + */ +private static class MockPartitionWriter extends InMemoryPartitionWriter { +private int allowedWrites = 1; Review Comment: nit: we don't need to set 1 right? It will not be used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15071) Flaky test kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection for Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT
[ https://issues.apache.org/jira/browse/KAFKA-15071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat updated KAFKA-15071: --- Component/s: core > Flaky test kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection > for Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT > -- > > Key: KAFKA-15071 > URL: https://issues.apache.org/jira/browse/KAFKA-15071 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Josep Prat >Priority: Major > > Test became > kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection flaky > again but failing because of different reason. In this case it might be a > missing cleanup > The values of the parameters are Type=ZK, MetadataVersion=3.5-IV2, > Security=PLAINTEXT > Related to https://issues.apache.org/jira/browse/KAFKA-13737 > Ocurred: > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/kafka.admin/LeaderElectionCommandTest/Build___JDK_8_and_Scala_2_123__Type_ZK__Name_testPreferredReplicaElection__MetadataVersion_3_5_IV2__Security_PLAINTEXT/ > h3. Error Message > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists.{code} > h3. Stacktrace > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists.{code} > {{ }} > h3. Standard Output > {code:java} > Successfully completed leader election (UNCLEAN) for partitions > unclean-topic-0 [2023-06-07 14:42:33,845] ERROR [QuorumController id=3000] > writeNoOpRecord: unable to start processing because of > RejectedExecutionException. Reason: The event queue is shutting down > (org.apache.kafka.controller.QuorumController:467) [2023-06-07 14:42:42,699] > WARN [AdminClient clientId=adminclient-65] Connection to node -2 > (localhost/127.0.0.1:35103) could not be established. Broker may not be > available. (org.apache.kafka.clients.NetworkClient:814) Successfully > completed leader election (UNCLEAN) for partitions unclean-topic-0 > [2023-06-07 14:42:44,416] ERROR [QuorumController id=0] writeNoOpRecord: > unable to start processing because of RejectedExecutionException. Reason: The > event queue is shutting down > (org.apache.kafka.controller.QuorumController:467) [2023-06-07 14:42:44,716] > WARN maxCnxns is not configured, using default value 0. > (org.apache.zookeeper.server.ServerCnxnFactory:309) [2023-06-07 14:42:44,765] > WARN No meta.properties file under dir > /tmp/kafka-2117748934951771120/meta.properties > (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:44,986] WARN No > meta.properties file under dir /tmp/kafka-5133306871105583937/meta.properties > (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:45,214] WARN No > meta.properties file under dir /tmp/kafka-8449809620400833553/meta.properties > (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:45,634] WARN > [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Received > UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-0. This > error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:45,634] WARN > [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Received > UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-4. This > error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:45,872] WARN > [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Received > UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-1. This > error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:46,010] WARN > [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error in response for > fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, > maxBytes=10485760, > fetchData={__consumer_offsets-3=PartitionData(topicId=vAlEsYVbTFClcpnVRp3AOw, > fetchOffset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 2 > was disconnected before the response was read at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99) > at > kafka.server.
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1222115234 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java: ## @@ -39,7 +40,7 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { /** * The accumulator. */ -private final EventAccumulator accumulator; Review Comment: Is this changed from integer to topic partition so that we can use different coordinator state partitions (ie consumer offsets vs transactional state) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15071) Flaky test kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection for Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT
[ https://issues.apache.org/jira/browse/KAFKA-15071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat updated KAFKA-15071: --- Affects Version/s: 3.5.0 > Flaky test kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection > for Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT > -- > > Key: KAFKA-15071 > URL: https://issues.apache.org/jira/browse/KAFKA-15071 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0 >Reporter: Josep Prat >Priority: Major > > Test became > kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection flaky > again but failing because of different reason. In this case it might be a > missing cleanup > The values of the parameters are Type=ZK, MetadataVersion=3.5-IV2, > Security=PLAINTEXT > Related to https://issues.apache.org/jira/browse/KAFKA-13737 > Ocurred: > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/kafka.admin/LeaderElectionCommandTest/Build___JDK_8_and_Scala_2_123__Type_ZK__Name_testPreferredReplicaElection__MetadataVersion_3_5_IV2__Security_PLAINTEXT/ > h3. Error Message > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists.{code} > h3. Stacktrace > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists.{code} > {{ }} > h3. Standard Output > {code:java} > Successfully completed leader election (UNCLEAN) for partitions > unclean-topic-0 [2023-06-07 14:42:33,845] ERROR [QuorumController id=3000] > writeNoOpRecord: unable to start processing because of > RejectedExecutionException. Reason: The event queue is shutting down > (org.apache.kafka.controller.QuorumController:467) [2023-06-07 14:42:42,699] > WARN [AdminClient clientId=adminclient-65] Connection to node -2 > (localhost/127.0.0.1:35103) could not be established. Broker may not be > available. (org.apache.kafka.clients.NetworkClient:814) Successfully > completed leader election (UNCLEAN) for partitions unclean-topic-0 > [2023-06-07 14:42:44,416] ERROR [QuorumController id=0] writeNoOpRecord: > unable to start processing because of RejectedExecutionException. Reason: The > event queue is shutting down > (org.apache.kafka.controller.QuorumController:467) [2023-06-07 14:42:44,716] > WARN maxCnxns is not configured, using default value 0. > (org.apache.zookeeper.server.ServerCnxnFactory:309) [2023-06-07 14:42:44,765] > WARN No meta.properties file under dir > /tmp/kafka-2117748934951771120/meta.properties > (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:44,986] WARN No > meta.properties file under dir /tmp/kafka-5133306871105583937/meta.properties > (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:45,214] WARN No > meta.properties file under dir /tmp/kafka-8449809620400833553/meta.properties > (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:45,634] WARN > [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Received > UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-0. This > error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:45,634] WARN > [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Received > UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-4. This > error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:45,872] WARN > [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Received > UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-1. This > error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:46,010] WARN > [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error in response for > fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, > maxBytes=10485760, > fetchData={__consumer_offsets-3=PartitionData(topicId=vAlEsYVbTFClcpnVRp3AOw, > fetchOffset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 2 > was disconnected before the response was read at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99) > at > kafka.server.BrokerBlockingSender.
[jira] [Commented] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs
[ https://issues.apache.org/jira/browse/KAFKA-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730272#comment-17730272 ] Josep Prat commented on KAFKA-14971: Another occurrence: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_11_and_Scala_2_13___testSyncTopicConfigs__/ > Flaky Test > org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs > -- > > Key: KAFKA-14971 > URL: https://issues.apache.org/jira/browse/KAFKA-14971 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Priority: Major > Labels: flaky-test, mirror-maker > > The test testSyncTopicConfigs in > `org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs` > seems to be flaky. Found here : > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests] > Ran on local against the [same PR > |https://github.com/apache/kafka/pull/13594]and it passed. > > > {code:java} > org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, > because it's explicitly defined on the target topic! ==> expected: <2000> but > was: <8640> > Stacktrace > org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, > because it's explicitly defined on the target topic! ==> expected: <2000> but > was: <8640> > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) > at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) > at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.e
[GitHub] [kafka] junrao commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()
junrao commented on code in PR #13797: URL: https://github.com/apache/kafka/pull/13797#discussion_r1221908399 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -106,4 +114,17 @@ private boolean process(final OffsetFetchApplicationEvent event) { manager.addOffsetFetchRequest(event.partitions); return true; } + +private boolean process(final MetadataUpdateApplicationEvent event) { +metadata.requestUpdateForNewTopics(); Review Comment: MetadataUpdateApplicationEvent is for for new topics. Could we name it clearer? Will there be a separate event for full metadata update? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -522,7 +525,35 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb @Override public void assign(Collection partitions) { -throw new KafkaException("method not implemented"); +if (partitions == null) { +throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null"); +} + +if (partitions.isEmpty()) { +this.unsubscribe(); +return; +} + +for (TopicPartition tp : partitions) { +String topic = (tp != null) ? tp.topic() : null; +if (Utils.isBlank(topic)) +throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); +} +// TODO: implement fetcher Review Comment: Is the TODO still needed? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -522,7 +525,35 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb @Override public void assign(Collection partitions) { -throw new KafkaException("method not implemented"); +if (partitions == null) { +throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null"); +} + +if (partitions.isEmpty()) { +this.unsubscribe(); +return; +} + +for (TopicPartition tp : partitions) { +String topic = (tp != null) ? tp.topic() : null; +if (Utils.isBlank(topic)) +throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); +} +// TODO: implement fetcher +// fetcher.clearBufferedDataForUnassignedPartitions(partitions); + +// make sure the offsets of topic partitions the consumer is unsubscribing from +// are committed since there will be no following rebalance +commit(subscriptions.allConsumed()); + +log.info("Assigned to partition(s): {}", Utils.join(partitions, ", ")); +if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) + updateMetadata(time.milliseconds()); Review Comment: The existing consumer seems to only update the metadata for new topics. `updateMetadata` doesn't make it clear that it's for new topics. Could we make it clear? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataUpdateApplicationEvent.java: ## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class MetadataUpdateApplicationEvent extends ApplicationEvent { + +private final long timestamp; Review Comment: `timestamp` seems unused? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -537,7 +568,9 @@ public void subscribe(Pattern pattern) { @Override public void unsubscribe() { -throw new KafkaException("method not implemented"); +// fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet()); Review Comment: Is this still needed? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java: ## @@ -166,7 +208,7
[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581446668 > ThreadLocal variables are not as frequently used in Kafka, but I can see a potential argument for usage here. One tricky part about thread locals is testing. Any ideas on how we could do this? Would we need to spin up a thread to assign the different boolean values? No, the test method would need to specifically mark its thread as wanting to poison the thread or not before running the rest of the test. In the `TransactionManagerTest`, there was only one place I had to specifically call it to get the tests to pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15020) integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor test is flaky
[ https://issues.apache.org/jira/browse/KAFKA-15020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730271#comment-17730271 ] Josep Prat commented on KAFKA-15020: Seen this test failing with a different error: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor__/ h3. Error Message {code:java} java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: Timed out while awaiting expected assignment Set(topicWithAllPartitionsOnAllRacks-1, topicWithSingleRackPartitions-1). The current assignment is []{code} h3. Stacktrace {code:java} java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: Timed out while awaiting expected assignment Set(topicWithAllPartitionsOnAllRacks-1, topicWithSingleRackPartitions-1). The current assignment is [] at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:206) at integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$9(FetchFromFollowerIntegrationTest.scala:211) at integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$9$adapted(FetchFromFollowerIntegrationTest.scala:211) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at integration.kafka.server.FetchFromFollowerIntegrationTest.verifyAssignments$1(FetchFromFollowerIntegrationTest.scala:211) at integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(FetchFromFollowerIntegrationTest.scala:251) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCol
[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581443881 ThreadLocal variables are not as frequently used in Kafka, but I can see a potential argument for usage here. One tricky part about thread locals is testing. Any ideas on how we could do this? Would we need to spin up a thread to assign the different boolean values? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14539) Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
[ https://issues.apache.org/jira/browse/KAFKA-14539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-14539. - Resolution: Fixed > Simplify StreamsMetadataState by replacing the Cluster metadata with > partition info map > --- > > Key: KAFKA-14539 > URL: https://issues.apache.org/jira/browse/KAFKA-14539 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Danica Fine >Priority: Major > > We can clean up the StreamsMetadataState class a bit by removing the > #onChange invocation that currently occurs within > StreamsPartitionAssignor#assign, which then lets us remove the `Cluster` > parameter in that callback. Instead of building a fake Cluster object from > the map of partition info when we invoke #onChange inside the > StreamsPartitionAssignor#onAssignment method, we can just directly pass in > the `Map` and replace the usage of `Cluster` > everywhere in StreamsMetadataState > (I believe the current system is a historical artifact from when we used to > require passing in a {{Cluster}} for the default partitioning strategy, which > the StreamMetadataState needs to compute the partition for a key. At some > point in the past we provided a better way to get the default partition, so > we no longer need a {{Cluster}} parameter/field at all) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14539) Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
[ https://issues.apache.org/jira/browse/KAFKA-14539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-14539: Fix Version/s: 3.6.0 > Simplify StreamsMetadataState by replacing the Cluster metadata with > partition info map > --- > > Key: KAFKA-14539 > URL: https://issues.apache.org/jira/browse/KAFKA-14539 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Danica Fine >Priority: Major > Fix For: 3.6.0 > > > We can clean up the StreamsMetadataState class a bit by removing the > #onChange invocation that currently occurs within > StreamsPartitionAssignor#assign, which then lets us remove the `Cluster` > parameter in that callback. Instead of building a fake Cluster object from > the map of partition info when we invoke #onChange inside the > StreamsPartitionAssignor#onAssignment method, we can just directly pass in > the `Map` and replace the usage of `Cluster` > everywhere in StreamsMetadataState > (I believe the current system is a historical artifact from when we used to > require passing in a {{Cluster}} for the default partitioning strategy, which > the StreamMetadataState needs to compute the partition for a key. At some > point in the past we provided a better way to get the default partition, so > we no longer need a {{Cluster}} parameter/field at all) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15071) Flaky test kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection for Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT
Josep Prat created KAFKA-15071: -- Summary: Flaky test kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection for Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT Key: KAFKA-15071 URL: https://issues.apache.org/jira/browse/KAFKA-15071 Project: Kafka Issue Type: Bug Reporter: Josep Prat Test became kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection flaky again but failing because of different reason. In this case it might be a missing cleanup The values of the parameters are Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT Related to https://issues.apache.org/jira/browse/KAFKA-13737 Ocurred: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/kafka.admin/LeaderElectionCommandTest/Build___JDK_8_and_Scala_2_123__Type_ZK__Name_testPreferredReplicaElection__MetadataVersion_3_5_IV2__Security_PLAINTEXT/ h3. Error Message {code:java} org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' already exists.{code} h3. Stacktrace {code:java} org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' already exists.{code} {{ }} h3. Standard Output {code:java} Successfully completed leader election (UNCLEAN) for partitions unclean-topic-0 [2023-06-07 14:42:33,845] ERROR [QuorumController id=3000] writeNoOpRecord: unable to start processing because of RejectedExecutionException. Reason: The event queue is shutting down (org.apache.kafka.controller.QuorumController:467) [2023-06-07 14:42:42,699] WARN [AdminClient clientId=adminclient-65] Connection to node -2 (localhost/127.0.0.1:35103) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:814) Successfully completed leader election (UNCLEAN) for partitions unclean-topic-0 [2023-06-07 14:42:44,416] ERROR [QuorumController id=0] writeNoOpRecord: unable to start processing because of RejectedExecutionException. Reason: The event queue is shutting down (org.apache.kafka.controller.QuorumController:467) [2023-06-07 14:42:44,716] WARN maxCnxns is not configured, using default value 0. (org.apache.zookeeper.server.ServerCnxnFactory:309) [2023-06-07 14:42:44,765] WARN No meta.properties file under dir /tmp/kafka-2117748934951771120/meta.properties (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:44,986] WARN No meta.properties file under dir /tmp/kafka-5133306871105583937/meta.properties (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:45,214] WARN No meta.properties file under dir /tmp/kafka-8449809620400833553/meta.properties (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:45,634] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:45,634] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-4. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:45,872] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-1. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:46,010] WARN [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-3=PartitionData(topicId=vAlEsYVbTFClcpnVRp3AOw, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 2 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99) at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129) at scala.Option.foreach(Option.scala:407) at kafka.server.AbstractFetc
[jira] [Created] (KAFKA-15070) Flaky test kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic for codec zstd
Josep Prat created KAFKA-15070: -- Summary: Flaky test kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic for codec zstd Key: KAFKA-15070 URL: https://issues.apache.org/jira/browse/KAFKA-15070 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.5.0 Reporter: Josep Prat Flaky tests with the following traces and output: h3. Error Message org.opentest4j.AssertionFailedError: Timed out waiting for deletion of old segments h3. Stacktrace org.opentest4j.AssertionFailedError: Timed out waiting for deletion of old segments at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) at org.junit.jupiter.api.Assertions.fail(Assertions.java:135) at kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:123) ... h3. Standard Output [2023-06-07 16:03:59,974] WARN [LocalLog partition=log-0, dir=/tmp/kafka-6339499869249617477] Record format version has been downgraded from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 16:04:01,691] WARN [LocalLog partition=log-0, dir=/tmp/kafka-6391328203703920459] Record format version has been downgraded from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 16:04:02,661] WARN [LocalLog partition=log-0, dir=/tmp/kafka-7107559685120209313] Record format version has been downgraded from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 16:04:04,449] WARN [LocalLog partition=log-0, dir=/tmp/kafka-2334095685379242376] Record format version has been downgraded from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 16:04:12,059] ERROR [LogLoader partition=log-0, dir=/tmp/kafka-4306370019245327987] Could not find offset index file corresponding to log file /tmp/kafka-4306370019245327987/log-0/0300.log, recovering segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 16:04:21,424] ERROR [LogLoader partition=log-0, dir=/tmp/kafka-8549848301585177643] Could not find offset index file corresponding to log file /tmp/kafka-8549848301585177643/log-0/0300.log, recovering segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 16:04:42,679] ERROR [LogLoader partition=log-0, dir=/tmp/kafka-8308685679443421785] Could not find offset index file corresponding to log file /tmp/kafka-8308685679443421785/log-0/0300.log, recovering segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 16:04:50,435] ERROR [LogLoader partition=log-0, dir=/tmp/kafka-2686097435338562303] Could not find offset index file corresponding to log file /tmp/kafka-2686097435338562303/log-0/0300.log, recovering segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 16:07:16,263] WARN [LocalLog partition=log-0, dir=/tmp/kafka-5435804108212698551] Record format version has been downgraded from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 16:07:35,193] WARN [LocalLog partition=log-0, dir=/tmp/kafka-4310277229895025994] Record format version has been downgraded from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 16:07:55,323] WARN [LocalLog partition=log-0, dir=/tmp/kafka-3364951894697258113] Record format version has been downgraded from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 16:08:16,286] WARN [LocalLog partition=log-0, dir=/tmp/kafka-3161518940405121110] Record format version has been downgraded from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 16:35:03,765] ERROR [LogLoader partition=log-0, dir=/tmp/kafka-2385863108707929062] Could not find offset index file corresponding to log file /tmp/kafka-2385863108707929062/log-0/0300.log, recovering segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 16:35:06,406] ERROR [LogLoader partition=log-0, dir=/tmp/kafka-5380450050465409057] Could not find offset index file corresponding to log file /tmp/kafka-5380450050465409057/log-0/0300.log, recovering segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 16:35:09,061] ERROR [LogLoader partition=log-0, dir=/tmp/kafka-7510941634638265317] Could not find offset index file corresponding to log file /tmp/kafka-7510941634638265317/log-0/0300.log, recovering segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 16:35:11,593] ERROR [LogLoader partition=log-0, dir=/tmp/kafka-7423113520781905391] Could not find offset index file corresponding to log file /tmp/kafka-7423113520781905391/log-0/0300.log, recovering segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 16:35:14,159] ERROR [LogLoader partition=log-0, dir=/tmp/kafka-2120426496175304835] Could not find offset index file corresponding to log file /tmp/kafka-212042649617530483
[jira] [Commented] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()
[ https://issues.apache.org/jira/browse/KAFKA-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730264#comment-17730264 ] Josep Prat commented on KAFKA-15052: Adding another occurrence: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/ > Fix flaky test QuorumControllerTest.testBalancePartitionLeaders() > - > > Key: KAFKA-15052 > URL: https://issues.apache.org/jira/browse/KAFKA-15052 > Project: Kafka > Issue Type: Test >Reporter: Dimitar Dimitrov >Assignee: Dimitar Dimitrov >Priority: Major > > Test failed at > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/] > as well as in various local runs. > The test creates a topic, fences a broker, notes partition imbalance due to > another broker taking over the partition the fenced broker lost, re-registers > and unfences the fenced broker, sends {{AlterPartition}} for the lost > partition adding the now unfenced broker back to its ISR, then waits for the > partition imbalance to disappear. > The local failures seem to happen when the brokers (including the ones that > never get fenced by the test) accidentally get fenced by losing their session > due to reaching the (aggressively low for test purposes) session timeout. > The Cloudbees failure quoted above also seems to indicate that this happened: > {code:java} > ...[truncated 738209 chars]... > 23. (org.apache.kafka.controller.QuorumController:768) > [2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write > event for maybeBalancePartitionLeaders because scheduled (DEFERRED), > checkIntervalNs (OptionalLong[10]) and isImbalanced (true) > (org.apache.kafka.controller.QuorumController:1401) > [2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 > because its session has timed out. > (org.apache.kafka.controller.ReplicationControlManager:1459) > [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: > changing partition(s): foo-0, foo-1, foo-2 > (org.apache.kafka.controller.ReplicationControlManager:1750) > [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for > foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 > -> 3, partitionEpoch: 2 -> 3 > (org.apache.kafka.controller.ReplicationControlManager:157) > [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for > foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: > 3 -> 4, partitionEpoch: 4 -> 5 > (org.apache.kafka.controller.ReplicationControlManager:157) > [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for > foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 > -> 3, partitionEpoch: 2 -> 3 > (org.apache.kafka.controller.ReplicationControlManager:157) > [2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, > appendTimestamp=240, > records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, > removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at > version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, > topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, > removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at > version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, > topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, > removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at > version 0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, > brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), > prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253) > [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory > snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197) > [2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log > check. (org.apache.kafka.metalog.LocalLogManager:512) > [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation > maybeFenceReplicas(451616131) will be completed when the log reaches offset > 27. (org.apache.kafka.controller.QuorumController:768) > [2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 > because its session has timed out. > (org.apache.kafka.controller.ReplicationControlManager:1459) > [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: > changing partition(s): foo-1 > (org.apache.kafka.controller.Rep
[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime
gharris1727 commented on PR #13313: URL: https://github.com/apache/kafka/pull/13313#issuecomment-1581417136 @ijuma Here are the results of my local test runs of test suites which use LATEST_0_8_2 and VerifiableProducer: * test_verifiable_producer.py: 25/25 PASS * compatibility_test_new_broker_test.py 44/52 producer_version=0.8.2.2.consumer_version=0.8.2.2.compression_types=.none.new_consumer=False.timestamp_type=None PASS * upgrade_test.py 51/54 from_kafka_version=0.8.2.2.to_message_format_version=None.compression_types=.none PASS from_kafka_version=0.8.2.2.to_message_format_version=None.compression_types=.snappy FAIL Looking into the snappy failure, it appears that the broker can't find the native snappy library, and the client is receiving an opaque InvocationTargetException. I think this is an environment specific problem, and it also appears on trunk. Are you able to start a branch build system test for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8073) Transient failure in kafka.api.UserQuotaTest.testThrottledProducerConsumer
[ https://issues.apache.org/jira/browse/KAFKA-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730263#comment-17730263 ] Josep Prat commented on KAFKA-8073: --- I can see this still failing: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/kafka.api/UserQuotaTest/Build___JDK_17_and_Scala_2_13___testThrottledProducerConsumer_String__quorum_kraft/ > Transient failure in kafka.api.UserQuotaTest.testThrottledProducerConsumer > -- > > Key: KAFKA-8073 > URL: https://issues.apache.org/jira/browse/KAFKA-8073 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0, 2.3.0 >Reporter: Bill Bejeck >Priority: Critical > > Failed in build [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20134/] > > Stacktrace and STDOUT > {noformat} > Error Message > java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have > been throttled > Stacktrace > java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have > been throttled > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > kafka.api.QuotaTestClients.verifyThrottleTimeMetric(BaseQuotaTest.scala:229) > at > kafka.api.QuotaTestClients.verifyProduceThrottle(BaseQuotaTest.scala:215) > at > kafka.api.BaseQuotaTest.testThrottledProducerConsumer(BaseQuotaTest.scala:82) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java
[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
bbejeck commented on PR #13751: URL: https://github.com/apache/kafka/pull/13751#issuecomment-1581399670 merged #13751 into trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
bbejeck merged PR #13751: URL: https://github.com/apache/kafka/pull/13751 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581369789 @jolshan here's the basic idea: ```diff diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 19f4c54b65..636c7907fa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -240,6 +240,9 @@ public class Sender implements Runnable { public void run() { log.debug("Starting Kafka producer I/O thread."); +if (transactionManager != null) +transactionManager.setPoisonStateOnInvalidTransition(true); + // main loop, runs until close is called while (running) { try { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 02f689da68..aa6de49aef 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -122,6 +122,7 @@ public class TransactionManager { private final Set newPartitionsInTransaction; private final Set pendingPartitionsInTransaction; private final Set partitionsInTransaction; +private final ThreadLocal shouldPoisonStateOnInvalidTransition; private PendingStateTransition pendingTransition; // This is used by the TxnRequestHandlers to control how long to back off before a given request is retried. @@ -278,6 +279,7 @@ public class TransactionManager { this.newPartitionsInTransaction = new HashSet<>(); this.pendingPartitionsInTransaction = new HashSet<>(); this.partitionsInTransaction = new HashSet<>(); +this.shouldPoisonStateOnInvalidTransition = ThreadLocal.withInitial(() -> false); this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority)); this.pendingTxnOffsetCommits = new HashMap<>(); this.partitionsWithUnresolvedSequences = new HashMap<>(); @@ -287,6 +289,10 @@ public class TransactionManager { this.apiVersions = apiVersions; } +void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) { +shouldPoisonStateOnInvalidTransition.set(shouldPoisonState); +} + public synchronized TransactionalRequestResult initializeTransactions() { return initializeTransactions(ProducerIdAndEpoch.NONE, CallingThread.APPLICATION); } @@ -1068,7 +1074,7 @@ public class TransactionManager { String message = idString + "Invalid transition attempted from state " + currentState.name() + " to state " + target.name(); -if (callingThread.shouldPoisonState()) { +if (shouldPoisonStateOnInvalidTransition.get()) { currentState = State.FATAL_ERROR; lastError = new IllegalStateException(message); throw lastError; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581356873 @jolshan The more I look at the code, the more I dislike it. Having the extra `CallingThread` parameter to distinguish application threads from the `Sender` thread is just... ugly. What do you think about storing a `Thread` reference or creating a `ThreadLocalData` in `TransactionManager`? That way the `Sender` can inform `TransactionManager` which thread it's using and we can eliminate all of that noise. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1222035271 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java: ## @@ -16,21 +16,23 @@ */ package org.apache.kafka.coordinator.group.runtime; +import org.apache.kafka.common.TopicPartition; + /** * The base event type used by all events processed in the * coordinator runtime. */ -public interface CoordinatorEvent extends EventAccumulator.Event { +public interface CoordinatorEvent extends EventAccumulator.Event { /** - * Runs the event. + * Executes the event. */ void run(); /** * Completes the event with the provided exception. * - * @param exception An exception to complete the event with. + * @param exception An exception if the processing of the event failed or null. Review Comment: nit: maybe "An exception if the processing of the event failed or null otherwise" I read this as exception if the event failed or was null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1222035054 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -977,18 +1058,20 @@ void handleCoordinatorReady() { initProducerIdVersion.maxVersion() >= 3; } -private void transitionTo(State target) { Review Comment: I overloaded `transitionTo` so as to let callers with `null` use the more concise version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1222034380 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -471,9 +547,9 @@ synchronized void requestEpochBumpForPartition(TopicPartition tp) { this.partitionsToRewriteSequences.add(tp); } -private void bumpIdempotentProducerEpoch() { +private void bumpIdempotentProducerEpoch(@SuppressWarnings("SameParameterValue") CallingThread callingThread) { Review Comment: Strictly speaking, it isn't needed, so I removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1222033908 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -0,0 +1,1009 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.deferred.DeferredEvent; +import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator + * or the transaction coordinator. + * + * The runtime framework maps each underlying partitions (e.g. __consumer_offsets) that that broker is a + * leader of to a coordinator replicated state machine. A replicated state machine holds the hard and soft + * state of all the objects (e.g. groups or offsets) assigned to the partition. The hard state is stored in + * timeline datastructures backed by a SnapshotRegistry. The runtime supports two type of operations + * on state machines: (1) Writes and (2) Reads. + * + * (1) A write operation, aka a request, can read the full and potentially **uncommitted** state from state + * machine to handle the operation. A write operation typically generates a response and a list of + * records. The records are applies to the state machine and persisted to the partition. The response + * is parked until the records are committed and delivered when they are. + * + * (2) A read operation, aka a request, can only read the committed state from the state machine to handle + * the operation. A read operation typically generates a response that is immediately completed. + * + * The runtime framework exposes an asynchronous, future based, API to the world. All the operations + * are executed by an CoordinatorEventProcessor. The processor guarantees that operations for a + * single partition or state machine are not processed concurrently. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public class CoordinatorRuntime, U> { + +/** + * Builder to create a CoordinatorRuntime. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public static class Builder, U> { +private LogContext logContext; +private CoordinatorEventProcessor eventProcessor; +private PartitionWriter partitionWriter; +private CoordinatorLoader loader; +private CoordinatorBuilderSupplier coordinatorBuilderSupplier; + +public Builder withLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +public Builder withEventProcessor(CoordinatorEventProcessor eventProcessor) { +this.eventProcessor = eventProcessor; +return this; +} + +public Builder withPartitionWriter(PartitionWriter partitionWriter) { +this.partitionWriter = partitionWriter; +return this; +} + +public Builder withLoader(CoordinatorLoader loader) { +this.loader = loader; +return this; +} + +public Builder withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier coordinatorBuilderSupplier) { +this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; +return this; +} + +public CoordinatorRuntime build() { +if (logContext == null) +logContext = new LogContext(); +if (eventProcessor == null) +throw new IllegalA
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1222029897 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -0,0 +1,1040 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.deferred.DeferredEvent; +import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator + * or the transaction coordinator. + * + * The runtime framework maps each underlying partitions (e.g. __consumer_offsets) that that broker is a + * leader of to a coordinator replicated state machine. A replicated state machine holds the hard and soft + * state of all the objects (e.g. groups or offsets) assigned to the partition. The hard state is stored in + * timeline datastructures backed by a SnapshotRegistry. The runtime supports two type of operations + * on state machines: (1) Writes and (2) Reads. + * + * (1) A write operation, aka a request, can read the full and potentially **uncommitted** state from state + * machine to handle the operation. A write operation typically generates a response and a list of + * records. The records are applies to the state machine and persisted to the partition. The response + * is parked until the records are committed and delivered when they are. + * + * (2) A read operation, aka a request, can only read the committed state from the state machine to handle + * the operation. A read operation typically generates a response that is immediately completed. + * + * The runtime framework exposes an asynchronous, future based, API to the world. All the operations + * are executed by an CoordinatorEventProcessor. The processor guarantees that operations for a + * single partition or state machine are not processed concurrently. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public class CoordinatorRuntime, U> { + +/** + * Builder to create a CoordinatorRuntime. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public static class Builder, U> { +private LogContext logContext; +private CoordinatorEventProcessor eventProcessor; +private PartitionWriter partitionWriter; +private CoordinatorLoader loader; +private CoordinatorBuilderSupplier coordinatorBuilderSupplier; + +public Builder withLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +public Builder withEventProcessor(CoordinatorEventProcessor eventProcessor) { +this.eventProcessor = eventProcessor; +return this; +} + +public Builder withPartitionWriter(PartitionWriter partitionWriter) { +this.partitionWriter = partitionWriter; +return this; +} + +public Builder withLoader(CoordinatorLoader loader) { +this.loader = loader; +return this; +} + +public Builder withCoordinatorBuilderSupplier(CoordinatorBuilderSupplier coordinatorBuilderSupplier) { +this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; +return this; +} + +public CoordinatorRuntime build() { +if (logContext == null) +logContext = new LogContext(); +if (eventProcessor == null) +throw new IllegalArgume
[GitHub] [kafka] mumrah commented on a diff in pull request #13823: MINOR: Move MockTime to server-common
mumrah commented on code in PR #13823: URL: https://github.com/apache/kafka/pull/13823#discussion_r1222020155 ## server-common/src/test/java/org/apache/kafka/server/util/MockTime.java: ## @@ -27,15 +23,21 @@ * 1. This has an associated scheduler instance for managing background tasks in a deterministic way. * 2. This doesn't support the `auto-tick` functionality as it interacts badly with the current implementation of `MockScheduler`. */ -class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends JMockTime(0, currentTimeMs, currentHiResTimeNs) { - - def this() = this(System.currentTimeMillis(), System.nanoTime()) +public class MockTime extends org.apache.kafka.common.utils.MockTime { +public final MockScheduler scheduler; Review Comment: Can we make this private and add an accessor? Or would that break a bunch of usages? I think Scala usages might be ok with the a no-parens method call -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1222019325 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -609,14 +686,15 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon } public synchronized void transitionToUninitialized(RuntimeException exception) { -transitionTo(State.UNINITIALIZED); +transitionTo(State.UNINITIALIZED, exception, InvalidStateDetectionStrategy.BACKGROUND); Review Comment: Correct. The exception is stored in `lastError`, mostly for the next call to `maybeFailWithError`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
dajac commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1221998443 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } + /** + * Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing. + * Creation starts the verification process. Otherwise return null. + */ + def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized { +if (hasOngoingTransaction(producerId)) + null +else + verificationGuard(producerId, true) + } + + /** + * Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null. + */ + def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized { Review Comment: Yeah, keeping it makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah closed pull request #13825: DO NOT MERGE: Testing a Github action
mumrah closed pull request #13825: DO NOT MERGE: Testing a Github action URL: https://github.com/apache/kafka/pull/13825 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1221986657 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } + /** + * Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing. + * Creation starts the verification process. Otherwise return null. + */ + def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized { +if (hasOngoingTransaction(producerId)) + null +else + verificationGuard(producerId, true) + } + + /** + * Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null. + */ + def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized { Review Comment: Hmmm... ``` /Users/justineolshan/kafka/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:2792:56: method getOrMaybeCreateVerificationGuard in class UnifiedLog cannot be accessed as a member of kafka.log.UnifiedLog from class ReplicaManagerTest in package server ``` I can potentially refactor this to not check verification guard in these tests, but I do think it is useful. What do you think about leaving as not package private? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1221910510 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -980,6 +1006,26 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state. + // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on + // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append. + // There are two phases -- the first append to the log and subsequent appends. + // + // 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and + // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction + // to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could + // have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker, + // start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not + // result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2. + // + // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the + // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still + // ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and + // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1. + if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) +if (!hasOngoingTransaction(batch.producerId) && (requestVerificationGuard != verificationGuard(batch.producerId) || requestVerificationGuard == null)) Review Comment: This is a hold over from the previous pr that also included tentative state. Since we may move that, I can make this all one. (But I may make it a helper since this is quite a lot of logic.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
wcarlson5 commented on code in PR #13751: URL: https://github.com/apache/kafka/pull/13751#discussion_r1221950345 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java: ## @@ -308,12 +308,17 @@ public synchronized KeyQueryMetadata getKeyQueryMetadataForKey(final String * * @param activePartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for active partitions * @param standbyPartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for standby partitions - * @param clusterMetadata the current clusterMetadata {@link Cluster} + * @param topicPartitionInfo the current mapping of {@link TopicPartition} -> {@Link PartitionInfo} */ synchronized void onChange(final Map> activePartitionHostMap, final Map> standbyPartitionHostMap, - final Cluster clusterMetadata) { -this.clusterMetadata = clusterMetadata; + final Map topicPartitionInfo) { +this.partitionsByTopic = new HashMap<>(); +topicPartitionInfo.entrySet().forEach(entry -> this.partitionsByTopic Review Comment: Can we format this so it is a bit easier to read? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #13530: KAFKA-14858: Handle exceptions thrown from Connector::taskConfigs in Connect's standalone mode
yashmayya commented on PR #13530: URL: https://github.com/apache/kafka/pull/13530#issuecomment-1581235501 > which is oddly not showing up on the GitHub PR UI, although it does show up on my fork's branch which this PR is created from https://www.githubstatus.com/incidents/1g1gkh0qpyvs Looks like GitHub is having its zillionth incident of the year 🤦♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API
yashmayya commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1221937078 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1587,10 +1595,11 @@ private boolean alterConnectorOffsetsChecks(String connName, Callback c // If the target state for the connector is stopped, its task count is 0, and there is no rebalance pending (checked above), // we can be sure that the tasks have at least been attempted to be stopped (or cancelled if they took too long to stop). // Zombie tasks are handled by a round of zombie fencing for exactly once source connectors. Zombie sink tasks are handled -// naturally because requests to alter consumer group offsets will fail if there are still active members in the group. +// naturally because requests to alter consumer group offsets / delete consumer groups will fail if there are still active members +// in the group. if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) { -callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. This " + -"can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null); +callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified externally. " + Review Comment: I think just "modified" should be clear enough to users since this is synchronously surfaced to the user via the alter / reset offsets REST API response 👍 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map> adminFutures = new ArrayList<>(); - -Map offsetsToAlter = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() != null) -.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue(; - -if (!offsetsToAlter.isEmpty()) { -log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", -connName, offsetsToAlter); -AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( +Map offsetsToWrite; +if (isReset) { +offsetsToWrite = new HashMap<>(); +ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, -alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); +try { +admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) +.partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) +.forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + +log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", +connName, groupId, offsetsToWrite.keySet()); +} catch (Exception e) { +Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); +log.error("Failed to list offsets prior to resetting sink connector offsets", e); +cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); +return; +} +} else { +offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } -Set partitionsToReset = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() == null) -.map(Map.Entry::getKey) -.collect(Collectors.toSet()); - -
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1221909838 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } + /** + * Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing. + * Creation starts the verification process. Otherwise return null. + */ + def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized { +if (hasOngoingTransaction(producerId)) + null +else + verificationGuard(producerId, true) + } + + /** + * Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null. + */ + def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized { Review Comment: This is also used in unifiedLog when we create the object and when we do the final check. But I can make it package private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mehbey commented on pull request #13709: KAFKA-14991:Added a validation to check if the record timestamp is in the future compared to the broker's timestamp
mehbey commented on PR #13709: URL: https://github.com/apache/kafka/pull/13709#issuecomment-1581221839 Hey @showuon , the [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-937%3A+Improve+Message+Timestamp+Validation#KIP937) for this pull request (PR) is published, and there have been a few discussions. I wanted to notify you and see if you have any comments, as you originally brought up the idea of writing the KIP. Please take a look whenever you have the time, in case you haven't already seen it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API
C0urante commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1221916704 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map> adminFutures = new ArrayList<>(); - -Map offsetsToAlter = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() != null) -.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue(; - -if (!offsetsToAlter.isEmpty()) { -log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", -connName, offsetsToAlter); -AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( +Map offsetsToWrite; +if (isReset) { +offsetsToWrite = new HashMap<>(); +ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, -alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); +try { +admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) +.partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) +.forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + +log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", +connName, groupId, offsetsToWrite.keySet()); +} catch (Exception e) { +Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); +log.error("Failed to list offsets prior to resetting sink connector offsets", e); +cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); +return; +} +} else { +offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } -Set partitionsToReset = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() == null) -.map(Map.Entry::getKey) -.collect(Collectors.toSet()); - -if (!partitionsToReset.isEmpty()) { -log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", -connName, partitionsToReset); -DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( -(int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, -deleteConsumerGroupOffsetsOptions); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); +// This should only occur for an offset reset request when: +// 1. There was a prior attempt to reset offsets +// OR +// 2. No offsets have been committed yet +if (offsetsToWrite.isEmpty()) { Review Comment: Good call--I agree that deleting the c
[GitHub] [kafka] C0urante commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API
C0urante commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1221803692 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ## @@ -374,19 +374,38 @@ public synchronized void connectorOffsets(String connName, Callback, Map> offsets, Callback cb) { +protected synchronized void modifyConnectorOffsets(String connName, Map, Map> offsets, Callback cb) { +if (!modifyConnectorOffsetsChecks(connName, cb)) { +return; +} + +if (offsets == null) { +worker.resetConnectorOffsets(connName, configState.connectorConfig(connName), cb); +} else { +worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, cb); +} +} + +/** + * This method performs a few checks for external requests to modify (alter or reset) connector offsets and + * completes the callback exceptionally if any check fails. + * @param connName the name of the connector whose offsets are to be modified + * @param cb callback to invoke upon completion + * @return true if all the checks passed, false otherwise + */ +private boolean modifyConnectorOffsetsChecks(String connName, Callback cb) { if (!configState.contains(connName)) { cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); -return; +return false; } if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) { -cb.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. " + -"This can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null); -return; +cb.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified externally. " + Review Comment: Nit: "modified externally" may not be very clear to users. I think "altered" captures the general concept well enough even if they're doing a reset, but if you'd like to be more precise, maybe we can say "altered or reset"? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1587,10 +1595,11 @@ private boolean alterConnectorOffsetsChecks(String connName, Callback c // If the target state for the connector is stopped, its task count is 0, and there is no rebalance pending (checked above), // we can be sure that the tasks have at least been attempted to be stopped (or cancelled if they took too long to stop). // Zombie tasks are handled by a round of zombie fencing for exactly once source connectors. Zombie sink tasks are handled -// naturally because requests to alter consumer group offsets will fail if there are still active members in the group. +// naturally because requests to alter consumer group offsets / delete consumer groups will fail if there are still active members +// in the group. if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) { -callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. This " + -"can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null); +callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified externally. " + Review Comment: Same thought RE "modified externally" (though seeing how "modified"/"modify" is used in some other log and exception messages in this class, I think just "modified" could work here too). ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map> adminFutures = new ArrayList<>(); - -Map offsetsToAlter = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() != null) -.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue(; - -if (!offsetsToAlter.isEmpty()) { -log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", -connName, offsetsToAlter); -AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( +Map offsetsT
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1221911516 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2095,82 +2105,143 @@ class ReplicaManagerTest { } @Test - def testVerificationForTransactionalPartitions(): Unit = { -val tp = new TopicPartition(topic, 0) -val transactionalId = "txn1" + def testVerificationForTransactionalPartitionsOnly(): Unit = { +val tp0 = new TopicPartition(topic, 0) +val tp1 = new TopicPartition(topic, 1) val producerId = 24L val producerEpoch = 0.toShort val sequence = 0 - -val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) -val metadataCache = mock(classOf[MetadataCache]) +val node = new Node(0, "host1", 0) val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) -val replicaManager = new ReplicaManager( - metrics = metrics, - config = config, - time = time, - scheduler = new MockScheduler(time), - logManager = mockLogMgr, - quotaManagers = quotaManager, - metadataCache = metadataCache, - logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), - alterPartitionManager = alterPartitionManager, - addPartitionsToTxnManager = Some(addPartitionsToTxnManager)) - +val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0, tp1), node) try { - val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), LeaderAndIsr(1, List(0, 1))) - replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) - // We must set up the metadata cache to handle the append and verification. - val metadataResponseTopic = Seq(new MetadataResponseTopic() -.setName(Topic.TRANSACTION_STATE_TOPIC_NAME) -.setPartitions(Seq( - new MetadataResponsePartition() -.setPartitionIndex(0) -.setLeaderId(0)).asJava)) - val node = new Node(0, "host1", 0) + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) - when(metadataCache.contains(tp)).thenReturn(true) - when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(metadataResponseTopic) - when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)).thenReturn(Some(node)) - when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)).thenReturn(None) - - // We will attempt to schedule to the request handler thread using a non request handler thread. Set this to avoid error. - KafkaRequestHandler.setBypassThreadCheck(true) + // If we supply no transactional ID and idempotent records, we do not verify. + val idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + appendRecords(replicaManager, tp0, idempotentRecords) + verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), any[AddPartitionsToTxnManager.AppendCallback]()) + assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId)) + + // If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records. + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1, +new SimpleRecord("message".getBytes)) + + val transactionToAdd = new AddPartitionsToTxnTransaction() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setVerifyOnly(true) +.setTopics(new AddPartitionsToTxnTopicCollection( + Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava +)) + + val idempotentRecords2 = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + appendRecordsToMultipleTopics(replicaManager, Map(tp0 -> transactionalRecords, tp1 -> idempotentRecords2), transactionalId, Some(0)) + verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), any[AddPartitionsToTxnManager.AppendCallback]()) + assertNotEquals(null, getVerificationGuard(replicaManager, tp0, producerId)) + assertEquals(null, get
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1221910510 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -980,6 +1006,26 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state. + // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on + // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append. + // There are two phases -- the first append to the log and subsequent appends. + // + // 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and + // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction + // to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could + // have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker, + // start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not + // result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2. + // + // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the + // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still + // ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and + // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1. + if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) +if (!hasOngoingTransaction(batch.producerId) && (requestVerificationGuard != verificationGuard(batch.producerId) || requestVerificationGuard == null)) Review Comment: This is a hold over from the previous pr that also included tentative state. Since we may move that, I can make this all one. (But I may make it a helper since this is quite a lot of logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1221909838 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } + /** + * Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing. + * Creation starts the verification process. Otherwise return null. + */ + def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized { +if (hasOngoingTransaction(producerId)) + null +else + verificationGuard(producerId, true) + } + + /** + * Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null. + */ + def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized { Review Comment: This is also used in unifiedLog when we do the final check. But I can make it package private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #13530: KAFKA-14858: Handle exceptions thrown from Connector::taskConfigs in Connect's standalone mode
yashmayya commented on PR #13530: URL: https://github.com/apache/kafka/pull/13530#issuecomment-1581192139 > Modifying the status parts of the REST API seems less like introducing further divergence and more like honestly reporting that existing divergence to users. Fair enough, I've pushed a change (which is oddly not showing up on the GitHub PR UI, although it does show up on my fork's branch which this PR is created from) where the connector's status is updated to failed and added some tests. I'm wondering whether we should also add some verbiage like `If there were any tasks running previously, they will continue running` to make it clear to users that it's only the connector instance that has failed as a result of not being able to reconfigure its tasks on startup (either first time or post update) / restart / resume. > Regardless, you're correct that we can't really alter the infinite-retry logic in distributed mode without a KIP. I do think a long-term fix for this problem would involve aligning how the two modes handle this kind of failure. Would love to see that if you have the time 😄 I'll add it to my TODO list if you think it's worth fixing 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan merged pull request #13811: KAFKA-14278: Fix InvalidProducerEpochException and InvalidTxnStateException handling in producer clients
jolshan merged PR #13811: URL: https://github.com/apache/kafka/pull/13811 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13823: MINOR: Move MockTime to server-common
dajac commented on PR #13823: URL: https://github.com/apache/kafka/pull/13823#issuecomment-1581188466 @mimaison @showuon @dengziming Would one of you have time to review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15056) Kafka producer still fails with ClusterAuthorizationException after permission granted
[ https://issues.apache.org/jira/browse/KAFKA-15056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730199#comment-17730199 ] Kirk True commented on KAFKA-15056: --- [~tophei] is this fixed by [KAFKA-13668|https://issues.apache.org/jira/browse/KAFKA-13668]? cc [~pnee] > Kafka producer still fails with ClusterAuthorizationException after > permission granted > -- > > Key: KAFKA-15056 > URL: https://issues.apache.org/jira/browse/KAFKA-15056 > Project: Kafka > Issue Type: Bug >Reporter: Jeff >Priority: Major > > Hi team, we are using kafka client 3.1.2 in the application, when initiating > a KafkaProducer without explicitly configuring idempotent write, it failed > with with ClusterAuthorizationException which is expected since idempotent > write is enabled by default. But after we granting the producer principal > IDEMPOTENT_WRITE permission, the producer still fails with same error until > we restarted the application and re-initiated the producer. > After checking the log and stacktrace, the code fails at this line > [https://github.com/apache/kafka/blame/3.1.2/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1000] > and in turn throws exception at this line > [https://github.com/apache/kafka/blob/3.1.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1125] > It appears the acl check is not happening at runtime dynamically, considering > the 'currentState' was still not set to a correct value after permission > granted. Besides, do we omit the checking of > 'transactionManager.isTransaction()' at this line on purpose in 3.1.2? > [https://github.com/apache/kafka/blob/3.1.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#LL988C70-L988C70] > This checking seemed to make sense since only transactional producer need > further call 'transactionManager.maybeAddPartitionToTransaction(tp);'? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT
[ https://issues.apache.org/jira/browse/KAFKA-14445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730195#comment-17730195 ] Kirk True commented on KAFKA-14445: --- [~ocadaruma] my apologies, I didn't read your initial comments carefully enough. You're correct, my patch does nothing to handle this case, so a patch would be welcomed :) > Producer doesn't request metadata update on REQUEST_TIMED_OUT > - > > Key: KAFKA-14445 > URL: https://issues.apache.org/jira/browse/KAFKA-14445 > Project: Kafka > Issue Type: Improvement >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Major > > Produce requests may fail with timeout by `request.timeout.ms` in below two > cases: > * Didn't receive produce response within `request.timeout.ms` > * Produce response received, but it ended up with `REQUEST_TIMED_OUT` in the > broker > Former case usually happens when a broker-machine got failed or there's > network glitch etc. > In this case, the connection will be disconnected and metadata-update will be > requested to discover new leader: > [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556] > > The problem is in latter case (REQUEST_TIMED_OUT on the broker). > In this case, the produce request will be ended up with TimeoutException, > which doesn't inherit InvalidMetadataException so it doesn't trigger metadata > update. > > Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side > problem, that metadata-update doesn't make much sense indeed. > > However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT > could cause produce requests to retry unnecessarily , which may end up with > batch expiration due to delivery timeout. > Below is the scenario we experienced: > * Environment: > ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1 > ** min.insync.replicas=2 > ** acks=all > * Scenario: > ** broker 1 "partially" failed > *** It lost ZooKeeper connection and kicked out from the cluster > There was controller log like: > * > {code:java} > [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , > deleted brokers: 1, bounced brokers: {code} > * > ** > *** However, somehow the broker was able continued to receive produce > requests > We're still working on investigating how this is possible though. > Indeed, broker 1 was somewhat "alive" and keeps working according to > server.log > *** In other words, broker 1 became "zombie" > ** broker 2 was elected as new leader > *** broker 3 became follower of broker 2 > *** However, since broker 1 was still out of cluster, it didn't receive > LeaderAndIsr so 1 kept thinking itself as the leader of tp-0 > ** Meanwhile, producer keeps sending produce requests to broker 1 and > requests were failed due to REQUEST_TIMED_OUT because no brokers replicates > from broker 1. > *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't > have a change to update its stale metadata > > So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, > to address the case that the old leader became "zombie" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13531) Flaky test NamedTopologyIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13531: - Attachment: org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout > Flaky test NamedTopologyIntegrationTest > --- > > Key: KAFKA-13531 > URL: https://issues.apache.org/jira/browse/KAFKA-13531 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Matthew de Detrich >Priority: Critical > Labels: flaky-test > Attachments: > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout > > > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets > {quote}java.lang.AssertionError: Did not receive all 3 records from topic > output-stream-2 within 6 ms, currently accumulated data is [] Expected: > is a value equal to or greater than <3> but: <0> was less than <3> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:648) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:644) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:617) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:439){quote} > STDERR > {quote}java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting > offsets of a topic is forbidden while the consumer group is actively > subscribed to it. at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213) > at > org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39) > at > org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122) > at > org.apache.kafka.streams.processor.internals.TopologyMetadata.maybeNotifyTopologyVersionWaiters(TopologyMetadata.java:154) > at > org.apache.kafka.streams.processor.internals.StreamThread.checkForTopologyUpdates(StreamThread.java:916) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575) > Caused by: org.apache.kafka.common.errors.GroupSubscribedToTopicException: > Deleting offsets of a topic is forbidden while the consumer group is actively > subscribed to it. java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting > offsets of a topic is forbidden while the consumer group is actively > subscribed to it. at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213) > at > org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.
[jira] [Commented] (KAFKA-13531) Flaky test NamedTopologyIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730190#comment-17730190 ] John Roesler commented on KAFKA-13531: -- Seen again while verifying the 3.5.0 release artifacts (this was the only test failure): Gradle Test Run :streams:test > Gradle Test Executor 554 > NamedTopologyIntegrationTest > shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() FAILED java.lang.AssertionError: Did not receive all 3 records from topic output-stream-1 within 6 ms, currently accumulated data is [] Expected: is a value equal to or greater than <3> but: <0> was less than <3> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:730) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:353) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:726) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:699) at org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing(NamedTopologyIntegrationTest.java:563) logs: [^org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout] > Flaky test NamedTopologyIntegrationTest > --- > > Key: KAFKA-13531 > URL: https://issues.apache.org/jira/browse/KAFKA-13531 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Matthew de Detrich >Priority: Critical > Labels: flaky-test > Attachments: > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout > > > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets > {quote}java.lang.AssertionError: Did not receive all 3 records from topic > output-stream-2 within 6 ms, currently accumulated data is [] Expected: > is a value equal to or greater than <3> but: <0> was less than <3> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:648) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:644) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:617) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:439){quote} > STDERR > {quote}java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting > offsets of a topic is forbidden while the consumer group is actively > subscribed to it. at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213) > at > org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39) > at > org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122) > at > org.apache.kafka.streams.processor.internals.
[jira] [Created] (KAFKA-15069) Refactor scanning hierarchy out of DelegatingClassLoader
Greg Harris created KAFKA-15069: --- Summary: Refactor scanning hierarchy out of DelegatingClassLoader Key: KAFKA-15069 URL: https://issues.apache.org/jira/browse/KAFKA-15069 Project: Kafka Issue Type: Task Components: KafkaConnect Reporter: Greg Harris Assignee: Greg Harris The DelegatingClassLoader is involved in both scanning and using the results of scanning to process classloading. Instead, the scanning should take place outside of the DelegatingClassLoader, and results of scanning be passed back into the DelegatingClassLoader for classloading functionality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1
clolov commented on PR #13260: URL: https://github.com/apache/kafka/pull/13260#issuecomment-1581120917 I have not, but I will aim to carry out the check in the upcoming days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified
[ https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-10337: --- Assignee: Erik van Oosten (was: David Jacot) > Wait for pending async commits in commitSync() even if no offsets are > specified > --- > > Key: KAFKA-10337 > URL: https://issues.apache.org/jira/browse/KAFKA-10337 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Tom Lee >Assignee: Erik van Oosten >Priority: Major > Fix For: 3.6.0 > > > The JavaDoc for commitSync() states the following: > {quote}Note that asynchronous offset commits sent previously with the > {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {quote} > But should we happen to call the method with an empty offset map > (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete > async commits will not be invoked because of an early return in > ConsumerCoordinator.commitOffsetsSync() when the input map is empty. > If users are doing manual offset commits and relying on commitSync as a > barrier for in-flight async commits prior to a rebalance, this could be an > important (though somewhat implementation-dependent) detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15051) docs: add missing connector plugin endpoint to documentation
[ https://issues.apache.org/jira/browse/KAFKA-15051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730156#comment-17730156 ] ASF GitHub Bot commented on KAFKA-15051: C0urante opened a new pull request, #520: URL: https://github.com/apache/kafka-site/pull/520 Ports the changes from https://github.com/apache/kafka/pull/13803 back through 3.2, the the version that originally added this endpoint. > docs: add missing connector plugin endpoint to documentation > > > Key: KAFKA-15051 > URL: https://issues.apache.org/jira/browse/KAFKA-15051 > Project: Kafka > Issue Type: Task > Components: docs, documentation >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Minor > > GET /plugin/config endpoint added in > [KIP-769|https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions] > is not included in the connect documentation page: > https://kafka.apache.org/documentation/#connect_rest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified
[ https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-10337: --- Assignee: David Jacot > Wait for pending async commits in commitSync() even if no offsets are > specified > --- > > Key: KAFKA-10337 > URL: https://issues.apache.org/jira/browse/KAFKA-10337 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Tom Lee >Assignee: David Jacot >Priority: Major > Fix For: 3.6.0 > > > The JavaDoc for commitSync() states the following: > {quote}Note that asynchronous offset commits sent previously with the > {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {quote} > But should we happen to call the method with an empty offset map > (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete > async commits will not be invoked because of an early return in > ConsumerCoordinator.commitOffsetsSync() when the input map is empty. > If users are doing manual offset commits and relying on commitSync as a > barrier for in-flight async commits prior to a rebalance, this could be an > important (though somewhat implementation-dependent) detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils
vamossagar12 commented on PR #13158: URL: https://github.com/apache/kafka/pull/13158#issuecomment-1581026296 I see. TBH I haven't followed that migration off-late. I think there are build failures as well which I haven't addressed yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
erikvanoosten commented on PR #13678: URL: https://github.com/apache/kafka/pull/13678#issuecomment-1581020841 > @erikvanoosten It seems that you don't have an account for jira so I can't assign the ticket to you. You should request one. I do have an account. My Jira userid is erikvanoosten. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint
C0urante commented on PR #13803: URL: https://github.com/apache/kafka/pull/13803#issuecomment-1581020451 @vvcephei haha, no worries! I'll leave Jenkins alone if I see you in the neighborhood testing again 🙏 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint
C0urante merged PR #13803: URL: https://github.com/apache/kafka/pull/13803 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks
C0urante commented on PR #13819: URL: https://github.com/apache/kafka/pull/13819#issuecomment-1581002727 Ah, I hadn't noticed that this was only introduced in https://github.com/apache/kafka/pull/13465. In that case, the good news is that this bug hasn't made it into a release yet! And the bad news is that it's a regression and we have to merge a fix before putting out 3.6.0 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified
[ https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-10337. - Fix Version/s: 3.6.0 Resolution: Fixed > Wait for pending async commits in commitSync() even if no offsets are > specified > --- > > Key: KAFKA-10337 > URL: https://issues.apache.org/jira/browse/KAFKA-10337 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Tom Lee >Priority: Major > Fix For: 3.6.0 > > > The JavaDoc for commitSync() states the following: > {quote}Note that asynchronous offset commits sent previously with the > {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {quote} > But should we happen to call the method with an empty offset map > (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete > async commits will not be invoked because of an early return in > ConsumerCoordinator.commitOffsetsSync() when the input map is empty. > If users are doing manual offset commits and relying on commitSync as a > barrier for in-flight async commits prior to a rebalance, this could be an > important (though somewhat implementation-dependent) detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
dajac commented on PR #13678: URL: https://github.com/apache/kafka/pull/13678#issuecomment-1580998335 @erikvanoosten It seems that you don't have an account for jira so I can't assign the ticket to you. You should request one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified
[ https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-10337: --- Assignee: (was: Kirk True) > Wait for pending async commits in commitSync() even if no offsets are > specified > --- > > Key: KAFKA-10337 > URL: https://issues.apache.org/jira/browse/KAFKA-10337 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Tom Lee >Priority: Major > > The JavaDoc for commitSync() states the following: > {quote}Note that asynchronous offset commits sent previously with the > {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {quote} > But should we happen to call the method with an empty offset map > (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete > async commits will not be invoked because of an early return in > ConsumerCoordinator.commitOffsetsSync() when the input map is empty. > If users are doing manual offset commits and relying on commitSync as a > barrier for in-flight async commits prior to a rebalance, this could be an > important (though somewhat implementation-dependent) detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac merged pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
dajac merged PR #13678: URL: https://github.com/apache/kafka/pull/13678 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #12306: KAFKA-13976: Improvements for OpenAPI specs
vamossagar12 commented on PR #12306: URL: https://github.com/apache/kafka/pull/12306#issuecomment-1580988777 Converting to draft. Need some time before I get to this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 closed pull request #12485: KAFKA-14131: Adding InterruptException when reading to end of Offseto…
vamossagar12 closed pull request #12485: KAFKA-14131: Adding InterruptException when reading to end of Offseto… URL: https://github.com/apache/kafka/pull/12485 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14131) KafkaBasedLog#readToLogEnd() may accciedently falls into infinite loop
[ https://issues.apache.org/jira/browse/KAFKA-14131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-14131: - Assignee: Sambhav Jain (was: Sagar Rao) > KafkaBasedLog#readToLogEnd() may accciedently falls into infinite loop > -- > > Key: KAFKA-14131 > URL: https://issues.apache.org/jira/browse/KAFKA-14131 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Justinwins >Assignee: Sambhav Jain >Priority: Major > > When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by > DistributedHerder.herderExecutor of name "Distrubuted-connect-" thread , e.g > . Distrubuted-connect-28-1 , which may consume a few minutes. > If another thread tries to shut down this herder , it will block for > "task.shutdown.graceful.timeout.ms ' before the > DistributedHerder.herderExecutor is interrupted. > And if thread in DistributedHerder.herderExecutor is interupted, > KafkaOffsetBackingStore.readToLogEnd() will poll(Integer.MAX_VALUE) and log " > Error polling" as the the read has been interrupted, then > "consumer.position" will not advance, readToLogEnd() falls into infinite loop. > > {code:java} > // code placeholder > private void readToLogEnd() { > Set assignment = consumer.assignment(); > Map endOffsets = readEndOffsets(assignment); > log.trace("Reading to end of log offsets {}", endOffsets); > while (!endOffsets.isEmpty()) { // this loop will never jump out > Iterator> it = > endOffsets.entrySet().iterator(); > while (it.hasNext()) { > Map.Entry entry = it.next(); > TopicPartition topicPartition = entry.getKey(); > long endOffset = entry.getValue(); > long lastConsumedOffset = consumer.position(topicPartition); // > when thread was in interupted status ,consumer.position will not advance > if (lastConsumedOffset >= endOffset) { > log.trace("Read to end offset {} for {}", endOffset, > topicPartition); > it.remove(); > } else { > log.trace("Behind end offset {} for {}; last-read offset is > {}", > endOffset, topicPartition, lastConsumedOffset); > poll(Integer.MAX_VALUE); // here , poll() will catch > InterruptedException and log it without throwing it up > break; > } > } > } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #13821: MINOR: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner
C0urante commented on PR #13821: URL: https://github.com/apache/kafka/pull/13821#issuecomment-1580969721 @gharris1727 can we start filing Jira tickets for these changes to classloading logic? They're large enough that I don't think `MINOR` is warranted anymore, and it'd be nice to have a picture of how they fit into the high-level implementation plan for KIP-898. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org