[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset

2023-06-07 Thread YaYun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YaYun Wang updated KAFKA-15074:
---
Description: 
I  got ?? "Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset " ??when i consumer kafka through 
@KafkaListener in that case producer publish 100W data to partition 4 in 
several minutes, and one consumer consume the data from the partition.

 

 

  was:
I  got ?? "Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset " ??when i consumer kafka through 
@KafkaListener in that case producer publish 100W data to partition 4 in one 
minute, and one consumer consume the data from the partition.

 

 


> offset out of range for partition xxx, resetting offset
> ---
>
> Key: KAFKA-15074
> URL: https://issues.apache.org/jira/browse/KAFKA-15074
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.2
>Reporter: YaYun Wang
>Priority: Major
>
> I  got ?? "Fetch position FetchPosition{offset=42574305, 
> offsetEpoch=Optional[2214], 
> currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
> cn-north-1d)], epoch=2214}} is out of range for partition 
> vcc.hdmap.tile.delivery-4, resetting offset " ??when i consumer kafka through 
> @KafkaListener in that case producer publish 100W data to partition 4 in 
> several minutes, and one consumer consume the data from the partition.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-07 Thread via GitHub


dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1222426539


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -980,6 +1006,25 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+
+  // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.
+  // This guarantees that transactional records are never written to 
the log outside of the transaction coordinator's knowledge of an open 
transaction on
+  // the partition. If we do not have an ongoing transaction or 
correct guard, return an error and do not append.
+  // There are two phases -- the first append to the log and 
subsequent appends.
+  //
+  // 1. First append: Verification starts with creating a verification 
guard object, sending a verification request to the transaction coordinator, and
+  // given a "verified" response, continuing the append path. (A 
non-verified response throws an error.) We create the unique verification guard 
for the transaction
+  // to ensure there is no race between the transaction coordinator 
response and an abort marker getting written to the log. We need a unique guard 
because we could
+  // have a sequence of events where we start a transaction 
verification, have the transaction coordinator send a verified response, write 
an abort marker,
+  // start a new transaction not aware of the partition, and receive 
the stale verification (ABA problem). With a unique verification guard object, 
this sequence would not
+  // result in appending to the log and would return an error. The 
guard is removed after the first append to the transaction and from then, we 
can rely on phase 2.
+  //
+  // 2. Subsequent appends: Once we write to the transaction, the 
in-memory state currentTxnFirstOffset is populated. This field remains until the
+  // transaction is completed or aborted. We can guarantee the 
transaction coordinator knows about the transaction given step 1 and that the 
transaction is still
+  // ongoing. If the transaction is expected to be ongoing, we will 
not set a verification guard. If the transaction is aborted, 
hasOngoingTransaction is false and
+  // requestVerificationGuard is null, so we will throw an error. A 
subsequent produce request (retry) should create verification state and return 
to phase 1.
+  if (!hasOngoingTransaction(batch.producerId) && 
batchMissingRequiredVerification(batch, requestVerificationGuard))

Review Comment:
   Is it safe to call hasOngoingTransaction when the batch is not 
transactional? It may be better to keep checking batch.isTransactional first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset

2023-06-07 Thread YaYun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YaYun Wang updated KAFKA-15074:
---
Description: 
I  got ?? "Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset " ??when i consumer kafka through 
@KafkaListener in that case producer publish 100W data to partition 4 in one 
minute, and one consumer consume the data from the partition.

 

 

  was:
I ??got Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset ??

when i consumer kafka through @KafkaListener in that case producer publish 100W 
data to partition 4 in one minute, and one consumer consume the data from the 
partition.

 

 


> offset out of range for partition xxx, resetting offset
> ---
>
> Key: KAFKA-15074
> URL: https://issues.apache.org/jira/browse/KAFKA-15074
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.2
>Reporter: YaYun Wang
>Priority: Major
>
> I  got ?? "Fetch position FetchPosition{offset=42574305, 
> offsetEpoch=Optional[2214], 
> currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
> cn-north-1d)], epoch=2214}} is out of range for partition 
> vcc.hdmap.tile.delivery-4, resetting offset " ??when i consumer kafka through 
> @KafkaListener in that case producer publish 100W data to partition 4 in one 
> minute, and one consumer consume the data from the partition.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15074) offset out of range for partition xxx, resetting offset

2023-06-07 Thread YaYun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730365#comment-17730365
 ] 

YaYun Wang edited comment on KAFKA-15074 at 6/8/23 3:30 AM:


*And here is the debug logs of kafka-client:*

2023-06-06 09:12:59.576+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] 
[traceId=] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [handle] [Consumer 
clientId=consumer-group-1, groupId=group] Committed offset 42574009 for 
partition myTopic-4
2023-06-06 09:12:59.577+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] 
[traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-1 at position FetchPosition{offset=2776251, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 2 rack: 
cn-north-1d)], epoch=}} to node host:9093 (id: 2 rack: cn-north-1d)
2023-06-06 09:12:59.577+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] 
[traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-7 at position FetchPosition{offset=1678666, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 2 rack: 
cn-north-1d)], epoch=}} to node host:9093 (id: 2 rack: cn-north-1d)
2023-06-06 09:12:59.577+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] 
[traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [sendFetches] [Consumer 
clientId=consumer-group-1, groupId=group] Sending READ_UNCOMMITTED 
IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), 
implied=(myTopic-1, myTopic-7), canUseTopicIds=True) to broker host:9093 (id: 2 
rack: cn-north-1d)
2023-06-06 09:12:59.963+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] 
[traceId=] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [handle] [Consumer 
clientId=consumer-group-1, groupId=group] Committed offset 42574109 for 
partition myTopic-4
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] 
[traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-8 at position FetchPosition{offset=2559637, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 0 rack: 
cn-north-1a)], epoch=2001}} to node host:9093 (id: 0 rack: cn-north-1a)
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] 
[traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-0 at position FetchPosition{offset=1678355, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 1 rack: 
cn-north-1b)], epoch=1996}} to node host:9093 (id: 1 rack: cn-north-1b)
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] 
[traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-3 at position FetchPosition{offset=2773458, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 1 rack: 
cn-north-1b)], epoch=2171}} to node host:9093 (id: 1 rack: cn-north-1b)
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] 
[traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-2 at position FetchPosition{offset=1677672, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 0 rack: 
cn-north-1a)], epoch=2001}} to node host:9093 (id: 0 rack: cn-north-1a)
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] 
[traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-5 at position FetchPosition{offset=1678870, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 0 rack: 
cn-north-1a)], epoch=1999}} to node host:9093 (id: 0 rack: cn-north-1a)
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1|#0-0-C-1] 
[traceId=] DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-6 at position FetchPosition{offset=1994802, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 1 rack: 
cn-north-1b)], epoch=1996}} to node host:9093 (id: 1 rack: cn-north-1b)
2023-06-06 09:12:59.964+

[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset

2023-06-07 Thread YaYun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YaYun Wang updated KAFKA-15074:
---
Description: 
I ??got Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset ??

when i consumer kafka through @KafkaListener in that case producer publish 100W 
data to partition 4 in one minute, and one consumer consume the data from the 
partition.

 

 

  was:
I ??got Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset ??

when i consumer kafka through @KafkaListener.

 

 


> offset out of range for partition xxx, resetting offset
> ---
>
> Key: KAFKA-15074
> URL: https://issues.apache.org/jira/browse/KAFKA-15074
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.2
>Reporter: YaYun Wang
>Priority: Major
>
> I ??got Fetch position FetchPosition{offset=42574305, 
> offsetEpoch=Optional[2214], 
> currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
> cn-north-1d)], epoch=2214}} is out of range for partition 
> vcc.hdmap.tile.delivery-4, resetting offset ??
> when i consumer kafka through @KafkaListener in that case producer publish 
> 100W data to partition 4 in one minute, and one consumer consume the data 
> from the partition.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15074) offset out of range for partition xxx, resetting offset

2023-06-07 Thread YaYun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730365#comment-17730365
 ] 

YaYun Wang commented on KAFKA-15074:


And here is my debug logs of kafka-client:

2023-06-06 09:12:59.576+ [istenerEndpointContainer#0-0-C-1] [traceId=] 
DEBUG o.a.k.c.c.i.ConsumerCoordinator - [handle] [Consumer 
clientId=consumer-group-1, groupId=group] Committed offset 42574009 for 
partition myTopic-4
2023-06-06 09:12:59.577+ [istenerEndpointContainer#0-0-C-1] [traceId=] 
DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-1 at position FetchPosition\{offset=2776251, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 2 rack: 
cn-north-1d)], epoch=}} to node host:9093 (id: 2 rack: cn-north-1d)
2023-06-06 09:12:59.577+ [istenerEndpointContainer#0-0-C-1] [traceId=] 
DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-7 at position FetchPosition\{offset=1678666, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 2 rack: 
cn-north-1d)], epoch=}} to node host:9093 (id: 2 rack: cn-north-1d)
2023-06-06 09:12:59.577+ [istenerEndpointContainer#0-0-C-1] [traceId=] 
DEBUG o.a.k.c.c.internals.Fetcher - [sendFetches] [Consumer 
clientId=consumer-group-1, groupId=group] Sending READ_UNCOMMITTED 
IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), 
implied=(myTopic-1, myTopic-7), canUseTopicIds=True) to broker host:9093 (id: 2 
rack: cn-north-1d)
2023-06-06 09:12:59.963+ [istenerEndpointContainer#0-0-C-1] [traceId=] 
DEBUG o.a.k.c.c.i.ConsumerCoordinator - [handle] [Consumer 
clientId=consumer-group-1, groupId=group] Committed offset 42574109 for 
partition myTopic-4
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] 
DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-8 at position FetchPosition\{offset=2559637, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 0 rack: 
cn-north-1a)], epoch=2001}} to node host:9093 (id: 0 rack: cn-north-1a)
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] 
DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-0 at position FetchPosition\{offset=1678355, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 1 rack: 
cn-north-1b)], epoch=1996}} to node host:9093 (id: 1 rack: cn-north-1b)
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] 
DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-3 at position FetchPosition\{offset=2773458, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 1 rack: 
cn-north-1b)], epoch=2171}} to node host:9093 (id: 1 rack: cn-north-1b)
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] 
DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-2 at position FetchPosition\{offset=1677672, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 0 rack: 
cn-north-1a)], epoch=2001}} to node host:9093 (id: 0 rack: cn-north-1a)
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] 
DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-5 at position FetchPosition\{offset=1678870, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 0 rack: 
cn-north-1a)], epoch=1999}} to node host:9093 (id: 0 rack: cn-north-1a)
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] 
DEBUG o.a.k.c.c.internals.Fetcher - [prepareFetchRequests] [Consumer 
clientId=consumer-group-1, groupId=group] Added READ_UNCOMMITTED fetch request 
for partition myTopic-6 at position FetchPosition\{offset=1994802, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[host:9093 (id: 1 rack: 
cn-north-1b)], epoch=1996}} to node host:9093 (id: 1 rack: cn-north-1b)
2023-06-06 09:12:59.964+ [istenerEndpointContainer#0-0-C-1] [traceId=] 
DEBUG o.a.k.c.c.internals.Fetcher - [sendFetches] [Consumer 
clientId=consumer-group-1, g

[jira] [Commented] (KAFKA-15074) offset out of range for partition xxx, resetting offset

2023-06-07 Thread YaYun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730360#comment-17730360
 ] 

YaYun Wang commented on KAFKA-15074:


*Here is my code:*

@KafkaListener(
topics = "myTopic",
containerFactory = "myContainerFactory")
public void consume(ConsumerRecord consumerRecord)
throws ExecutionException, InterruptedException {
String recordValue = consumerRecord.value();

// do something with recordValue 

}

> offset out of range for partition xxx, resetting offset
> ---
>
> Key: KAFKA-15074
> URL: https://issues.apache.org/jira/browse/KAFKA-15074
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.2
>Reporter: YaYun Wang
>Priority: Major
>
> I ??got Fetch position FetchPosition{offset=42574305, 
> offsetEpoch=Optional[2214], 
> currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
> cn-north-1d)], epoch=2214}} is out of range for partition 
> vcc.hdmap.tile.delivery-4, resetting offset ??
> when i consumer kafka through @KafkaListener.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset

2023-06-07 Thread YaYun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YaYun Wang updated KAFKA-15074:
---
Description: 
I ??got Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset ??

when i consumer kafka through @KafkaListener

 

 

  was:
I ??got Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch\{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset ??

when i consumer kafka through @KafkaListener:

 

 


> offset out of range for partition xxx, resetting offset
> ---
>
> Key: KAFKA-15074
> URL: https://issues.apache.org/jira/browse/KAFKA-15074
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.2
>Reporter: YaYun Wang
>Priority: Major
>
> I ??got Fetch position FetchPosition{offset=42574305, 
> offsetEpoch=Optional[2214], 
> currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
> cn-north-1d)], epoch=2214}} is out of range for partition 
> vcc.hdmap.tile.delivery-4, resetting offset ??
> when i consumer kafka through @KafkaListener
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset

2023-06-07 Thread YaYun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YaYun Wang updated KAFKA-15074:
---
Description: 
I ??got Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset ??

when i consumer kafka through @KafkaListener.

 

 

  was:
I ??got Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset ??

when i consumer kafka through @KafkaListener

 

 


> offset out of range for partition xxx, resetting offset
> ---
>
> Key: KAFKA-15074
> URL: https://issues.apache.org/jira/browse/KAFKA-15074
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.2
>Reporter: YaYun Wang
>Priority: Major
>
> I ??got Fetch position FetchPosition{offset=42574305, 
> offsetEpoch=Optional[2214], 
> currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
> cn-north-1d)], epoch=2214}} is out of range for partition 
> vcc.hdmap.tile.delivery-4, resetting offset ??
> when i consumer kafka through @KafkaListener.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15074) offset out of range for partition xxx, resetting offset

2023-06-07 Thread YaYun Wang (Jira)
YaYun Wang created KAFKA-15074:
--

 Summary: offset out of range for partition xxx, resetting offset
 Key: KAFKA-15074
 URL: https://issues.apache.org/jira/browse/KAFKA-15074
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.3.2
Reporter: YaYun Wang


I ??got Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch\{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset ??

when i consumer kafka through @KafkaListener:

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-07 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1222332377


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class RocksDBTimeOrderedKeyValueBufferTest {
+public RocksDBTimeOrderedKeyValueBuffer buffer;
+@Mock
+public SerdeGetter serdeGetter;
+public InternalProcessorContext context;
+public StreamsMetricsImpl streamsMetrics;
+@Mock
+public Sensor sensor;
+public long offset;
+
+@Before
+public void setUp() {
+when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
+when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
+final Metrics metrics = new Metrics();
+offset = 0;
+streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+context = new 
MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new 
TaskId(0, 0), TestUtils.tempDirectory());
+}
+
+public void createJoin(final Duration grace) {
+final RocksDBTimeOrderedKeyValueSegmentedBytesStore store = new 
RocksDbTimeOrderedKeyValueBytesStoreSupplier("testing",  100).get();

Review Comment:
   For testing this was mostly just me being and picking a number I was sure 
would work.
   
   putting it as the grace period or higher would be the correct thing not in a 
test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #13827: KAFKA-15073: Add a Github action to mark PRs as stale

2023-06-07 Thread via GitHub


mumrah opened a new pull request, #13827:
URL: https://github.com/apache/kafka/pull/13827

   This patch uses a Github workflow to mark inactive PRs with the `stale` 
label. The workflow uses the ["stale" action](https://github.com/actions/stale) 
which has many features including a number of filtering options and the ability 
to auto-closing issues.
   
   For this patch, I've disable auto-closing PRs and just have it add the label 
for PRs with more than 90 days of inactivity. I've also enabled the "dry-run" 
option while we discuss the configuration.
   
   The workflow will run nightly at 3:30 UTC. It can also be triggered manually.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15073) Automation for old/inactive PRs

2023-06-07 Thread David Arthur (Jira)
David Arthur created KAFKA-15073:


 Summary: Automation for old/inactive PRs
 Key: KAFKA-15073
 URL: https://issues.apache.org/jira/browse/KAFKA-15073
 Project: Kafka
  Issue Type: Improvement
  Components: build
Reporter: David Arthur


Following from a discussion on the mailing list. It would be nice to 
automatically triage inactive PRs. There are currently over 1000 open PRs. Most 
likely a majority of these will not ever be merged and should be closed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1222312423


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.runtime.Coordinator;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+/**
+ * The group coordinator replicated state machine that manages the metadata of 
all generic and
+ * consumer groups. It holds the hard and the soft state of the groups. This 
class has two kinds
+ * of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class ReplicatedGroupCoordinator implements Coordinator {

Review Comment:
   Is this called replicated because we replicate the state? (In other words, 
this is the implementation to get the hard state we already have for the 
current group coordinator)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…

2023-06-07 Thread via GitHub


jolshan commented on PR #13796:
URL: https://github.com/apache/kafka/pull/13796#issuecomment-1581669973

   > No, we can't - it all depends on the retries and the delivery timeout.
   Do you think that causes a problem?
   
   I guess I just need to clarify what retried batches are here -- is the idea 
that we wait for inflight batches to return a response or time out? What if the 
response triggers another retry? Would we prevent that from sending out?
   
   I'm also wondering the benefit of preserving the previous batches if there 
is an error. How does the system recovery differently if we allow those batches 
to "complete". I think we could run into cases where the error causes the 
inflight batches to be unable to be written. Do we prefer to fail them (what we 
may do now) and start clean or try to write them with new sequences? I can see 
both scenarios causing issues. 
   
   I guess it boils down to availability of writes (rewriting the sequences 
allows us to continue writing) or idempotency correctness (trying to wait for 
them to complete with their old sequences). The sticking point I'm running into 
is why getting those extra inflight requests (potentially) written is better if 
we've hit a non-retriable error. 
   
   Maybe I just need an example :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-07 Thread via GitHub


kirktrue commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581658284

   > I'm not sure I follow. Are you saying that the test would set the thread 
local before running? So a single thread may set itself back and forth within 
the suite? (If you have a commit with the change, that may also help me 
understand)
   
   I have updated the PR with the `ThreadLocal`-based approach. This approach 
results in much fewer changes to the surrounding classes. Please take a look 
when you have time. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-07 Thread via GitHub


kirktrue commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581633134

   > Are you saying that the test would set the thread local before running? So 
a single thread may set itself back and forth within the suite?
   
   Precisely. That's in effect what it does now in the tests: either `SENDER` 
or `APPLICATION` is passed in to the various `TransactionManager` calls.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pjpringle commented on pull request #12540: bug: State stores lose state when tasks are reassigned under EOS wit…

2023-06-07 Thread via GitHub


pjpringle commented on PR #12540:
URL: https://github.com/apache/kafka/pull/12540#issuecomment-1581567216

   Is this bug eos specific given the title? Looking at the fix I dont believe 
it is. I have been seeing similar stale state issues on rebalance in a non eos 
setup but with stand by replicas configured


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-07 Thread via GitHub


jolshan commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581564571

   > No, the test method would need to specifically mark its thread as wanting 
to poison the thread or not before running the rest of the test. In the 
TransactionManagerTest, there was only one place I had to specifically call it 
to get the tests to pass.
   
   I'm not sure I follow. Are you saying that the test would set the thread 
local before running? So a single thread may set itself back and forth within 
the suite? (If you have a commit with the change, that may also help me 
understand)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-07 Thread via GitHub


vcrfxia commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r107444


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class RocksDBTimeOrderedKeyValueBufferTest {
+public RocksDBTimeOrderedKeyValueBuffer buffer;
+@Mock
+public SerdeGetter serdeGetter;
+public InternalProcessorContext context;
+public StreamsMetricsImpl streamsMetrics;
+@Mock
+public Sensor sensor;
+public long offset;
+
+@Before
+public void setUp() {
+when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
+when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
+final Metrics metrics = new Metrics();
+offset = 0;
+streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+context = new 
MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new 
TaskId(0, 0), TestUtils.tempDirectory());
+}
+
+public void createJoin(final Duration grace) {
+final RocksDBTimeOrderedKeyValueSegmentedBytesStore store = new 
RocksDbTimeOrderedKeyValueBytesStoreSupplier("testing",  100).get();

Review Comment:
   What's the relationship between the segments store retention period (100, 
currently) and the buffer's grace period (`grace`)? Any reason they shouldn't 
be the same? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15060) Fix the ApiVersionManager interface

2023-06-07 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15060:
-
Description: (was: Fix Admin.describeFeatures, which was accidentally 
broken by KAFKA-15007.)

> Fix the ApiVersionManager interface
> ---
>
> Key: KAFKA-15060
> URL: https://issues.apache.org/jira/browse/KAFKA-15060
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe opened a new pull request, #13826: KAFKA-15060: fix the ApiVersionManager interface

2023-06-07 Thread via GitHub


cmccabe opened a new pull request, #13826:
URL: https://github.com/apache/kafka/pull/13826

   This PR expands the scope of ApiVersionManager a bit to include returning 
the current MetadataVersion and features that are in effect. This is useful in 
general because that information needs to be returned in an 
ApiVersionsResponse. It also allows us to fix the ApiVersionManager interface 
so that all subclasses implement all methods of the interface. Having 
subclasses that don't implement some methods is dangerous because they could 
cause exceptions at runtime in unexpected scenarios.
   
   On the KRaft controller, we were previously performing a read operation in 
the QuorumController thread to get the current metadata version and features. 
With this PR, we now read a volatile variable maintained by a separate 
MetadataVersionContextPublisher object. This will improve performance and 
simplify the code. It should not change the guarantees we are providing; in 
both the old and new scenarios, we need to be robust against version skew 
scenarios during updates.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15060) Fix the ApiVersionManager interface

2023-06-07 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15060:
-
Summary: Fix the ApiVersionManager interface  (was: Fix 
Admin.describeFeatures)

> Fix the ApiVersionManager interface
> ---
>
> Key: KAFKA-15060
> URL: https://issues.apache.org/jira/browse/KAFKA-15060
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Major
>
> Fix Admin.describeFeatures, which was accidentally broken by KAFKA-15007.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r102869


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CoordinatorRuntimeTest {
+private static final TopicPartition TP = new 
TopicPartition("__consumer_offsets", 0);
+
+/**
+ * An CoordinatorEventProcessor that directly executes the operations. 
This is
+ * useful in unit tests where execution in threads is not required.
+ */
+private static class MockEventProcessor implements 
CoordinatorEventProcessor {
+@Override
+public void enqueue(CoordinatorEvent event) throws 
RejectedExecutionException {
+try {
+event.run();
+} catch (Throwable ex) {
+event.complete(ex);
+}
+}
+
+@Override
+public void close() throws Exception {}
+}
+
+/**
+ * A CoordinatorLoader that always succeeds.
+ */
+private static class MockCoordinatorLoader implements 
CoordinatorLoader {
+@Override
+public CompletableFuture load(TopicPartition tp, 
CoordinatorPlayback replayable) {
+return CompletableFuture.completedFuture(null);
+}
+}
+
+/**
+ * An in-memory partition writer that accepts a maximum number of writes.
+ */
+private static class MockPartitionWriter extends 
InMemoryPartitionWriter {
+private int allowedWrites = 1;
+
+public MockPartitionWriter() {
+this(Integer.MAX_VALUE);
+}
+
+public MockPartitionWriter(int allowedWrites) {
+super(false);
+this.allowedWrites = allowedWrites;
+}
+
+@Override
+public void registerListener(TopicPartition tp, Listener listener) {
+super.registerListener(tp, listener);
+}
+
+@Override
+public void deregisterListener(TopicPartition tp, Listener listener) {
+super.deregisterListener(tp, listener);
+}
+
+@Override
+public long append(TopicPartition tp, List records) throws 
KafkaException {
+if (allowedWrites-- > 0) {
+return super.append(tp, records);
+} else {
+throw new KafkaException("append failed.");
+}
+}
+}
+
+/**
+ * A simple Coordinator implementation that stores the records into a set.
+ */
+private static class MockCoordinator implements Coordinator {
+private final TimelineHashSet records;
+
+MockCoordinator(
+SnapshotRegistry snapshotRegistry
+) {
+records = new TimelineHashSet<>(snapshotRegistry,

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1222198170


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CoordinatorRuntimeTest {
+private static final TopicPartition TP = new 
TopicPartition("__consumer_offsets", 0);
+
+/**
+ * An CoordinatorEventProcessor that directly executes the operations. 
This is
+ * useful in unit tests where execution in threads is not required.
+ */
+private static class MockEventProcessor implements 
CoordinatorEventProcessor {
+@Override
+public void enqueue(CoordinatorEvent event) throws 
RejectedExecutionException {
+try {
+event.run();
+} catch (Throwable ex) {
+event.complete(ex);
+}
+}
+
+@Override
+public void close() throws Exception {}
+}
+
+/**
+ * A CoordinatorLoader that always succeeds.
+ */
+private static class MockCoordinatorLoader implements 
CoordinatorLoader {
+@Override
+public CompletableFuture load(TopicPartition tp, 
CoordinatorPlayback replayable) {
+return CompletableFuture.completedFuture(null);
+}
+}
+
+/**
+ * An in-memory partition writer that accepts a maximum number of writes.
+ */
+private static class MockPartitionWriter extends 
InMemoryPartitionWriter {
+private int allowedWrites = 1;
+
+public MockPartitionWriter() {
+this(Integer.MAX_VALUE);
+}
+
+public MockPartitionWriter(int allowedWrites) {
+super(false);
+this.allowedWrites = allowedWrites;
+}
+
+@Override
+public void registerListener(TopicPartition tp, Listener listener) {
+super.registerListener(tp, listener);
+}
+
+@Override
+public void deregisterListener(TopicPartition tp, Listener listener) {
+super.deregisterListener(tp, listener);
+}
+
+@Override
+public long append(TopicPartition tp, List records) throws 
KafkaException {
+if (allowedWrites-- > 0) {
+return super.append(tp, records);
+} else {
+throw new KafkaException("append failed.");
+}
+}
+}
+
+/**
+ * A simple Coordinator implementation that stores the records into a set.
+ */
+private static class MockCoordinator implements Coordinator {
+private final TimelineHashSet records;
+
+MockCoordinator(
+SnapshotRegistry snapshotRegistry
+) {
+records = new TimelineHashSet<>(snapshotRegistry,

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1222177376


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CoordinatorRuntimeTest {
+private static final TopicPartition TP = new 
TopicPartition("__consumer_offsets", 0);
+
+/**
+ * An CoordinatorEventProcessor that directly executes the operations. 
This is
+ * useful in unit tests where execution in threads is not required.
+ */
+private static class MockEventProcessor implements 
CoordinatorEventProcessor {
+@Override
+public void enqueue(CoordinatorEvent event) throws 
RejectedExecutionException {
+try {
+event.run();
+} catch (Throwable ex) {
+event.complete(ex);
+}
+}
+
+@Override
+public void close() throws Exception {}
+}
+
+/**
+ * A CoordinatorLoader that always succeeds.
+ */
+private static class MockCoordinatorLoader implements 
CoordinatorLoader {
+@Override
+public CompletableFuture load(TopicPartition tp, 
CoordinatorPlayback replayable) {
+return CompletableFuture.completedFuture(null);
+}
+}
+
+/**
+ * An in-memory partition writer that accepts a maximum number of writes.
+ */
+private static class MockPartitionWriter extends 
InMemoryPartitionWriter {
+private int allowedWrites = 1;
+
+public MockPartitionWriter() {
+this(Integer.MAX_VALUE);
+}
+
+public MockPartitionWriter(int allowedWrites) {
+super(false);
+this.allowedWrites = allowedWrites;
+}
+
+@Override
+public void registerListener(TopicPartition tp, Listener listener) {
+super.registerListener(tp, listener);
+}
+
+@Override
+public void deregisterListener(TopicPartition tp, Listener listener) {
+super.deregisterListener(tp, listener);
+}
+
+@Override
+public long append(TopicPartition tp, List records) throws 
KafkaException {
+if (allowedWrites-- > 0) {
+return super.append(tp, records);
+} else {
+throw new KafkaException("append failed.");
+}
+}
+}
+
+/**
+ * A simple Coordinator implementation that stores the records into a set.
+ */
+private static class MockCoordinator implements Coordinator {
+private final TimelineHashSet records;
+
+MockCoordinator(
+SnapshotRegistry snapshotRegistry
+) {
+records = new TimelineHashSet<>(snapshotRegistry,

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1222175645


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CoordinatorRuntimeTest {
+private static final TopicPartition TP = new 
TopicPartition("__consumer_offsets", 0);
+
+/**
+ * An CoordinatorEventProcessor that directly executes the operations. 
This is
+ * useful in unit tests where execution in threads is not required.
+ */
+private static class MockEventProcessor implements 
CoordinatorEventProcessor {
+@Override
+public void enqueue(CoordinatorEvent event) throws 
RejectedExecutionException {
+try {
+event.run();
+} catch (Throwable ex) {
+event.complete(ex);
+}
+}
+
+@Override
+public void close() throws Exception {}
+}
+
+/**
+ * A CoordinatorLoader that always succeeds.
+ */
+private static class MockCoordinatorLoader implements 
CoordinatorLoader {
+@Override
+public CompletableFuture load(TopicPartition tp, 
CoordinatorPlayback replayable) {
+return CompletableFuture.completedFuture(null);
+}
+}
+
+/**
+ * An in-memory partition writer that accepts a maximum number of writes.
+ */
+private static class MockPartitionWriter extends 
InMemoryPartitionWriter {
+private int allowedWrites = 1;
+
+public MockPartitionWriter() {
+this(Integer.MAX_VALUE);
+}
+
+public MockPartitionWriter(int allowedWrites) {
+super(false);
+this.allowedWrites = allowedWrites;
+}
+
+@Override
+public void registerListener(TopicPartition tp, Listener listener) {
+super.registerListener(tp, listener);
+}
+
+@Override
+public void deregisterListener(TopicPartition tp, Listener listener) {
+super.deregisterListener(tp, listener);
+}
+
+@Override
+public long append(TopicPartition tp, List records) throws 
KafkaException {
+if (allowedWrites-- > 0) {
+return super.append(tp, records);
+} else {
+throw new KafkaException("append failed.");
+}
+}
+}
+
+/**
+ * A simple Coordinator implementation that stores the records into a set.
+ */
+private static class MockCoordinator implements Coordinator {
+private final TimelineHashSet records;
+
+MockCoordinator(
+SnapshotRegistry snapshotRegistry
+) {
+records = new TimelineHashSet<>(snapshotRegistry,

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1222169738


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CoordinatorRuntimeTest {
+private static final TopicPartition TP = new 
TopicPartition("__consumer_offsets", 0);
+
+/**
+ * An CoordinatorEventProcessor that directly executes the operations. 
This is
+ * useful in unit tests where execution in threads is not required.
+ */
+private static class MockEventProcessor implements 
CoordinatorEventProcessor {
+@Override
+public void enqueue(CoordinatorEvent event) throws 
RejectedExecutionException {
+try {
+event.run();
+} catch (Throwable ex) {
+event.complete(ex);
+}
+}
+
+@Override
+public void close() throws Exception {}
+}
+
+/**
+ * A CoordinatorLoader that always succeeds.
+ */
+private static class MockCoordinatorLoader implements 
CoordinatorLoader {
+@Override
+public CompletableFuture load(TopicPartition tp, 
CoordinatorPlayback replayable) {
+return CompletableFuture.completedFuture(null);
+}
+}
+
+/**
+ * An in-memory partition writer that accepts a maximum number of writes.
+ */
+private static class MockPartitionWriter extends 
InMemoryPartitionWriter {
+private int allowedWrites = 1;
+
+public MockPartitionWriter() {
+this(Integer.MAX_VALUE);
+}
+
+public MockPartitionWriter(int allowedWrites) {
+super(false);
+this.allowedWrites = allowedWrites;
+}
+
+@Override
+public void registerListener(TopicPartition tp, Listener listener) {
+super.registerListener(tp, listener);
+}
+
+@Override
+public void deregisterListener(TopicPartition tp, Listener listener) {
+super.deregisterListener(tp, listener);
+}
+
+@Override
+public long append(TopicPartition tp, List records) throws 
KafkaException {
+if (allowedWrites-- > 0) {
+return super.append(tp, records);
+} else {
+throw new KafkaException("append failed.");
+}
+}
+}
+
+/**
+ * A simple Coordinator implementation that stores the records into a set.
+ */
+private static class MockCoordinator implements Coordinator {
+private final TimelineHashSet records;
+
+MockCoordinator(
+SnapshotRegistry snapshotRegistry
+) {
+records = new TimelineHashSet<>(snapshotRegistry,

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1222158858


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CoordinatorRuntimeTest {
+private static final TopicPartition TP = new 
TopicPartition("__consumer_offsets", 0);
+
+/**
+ * An CoordinatorEventProcessor that directly executes the operations. 
This is
+ * useful in unit tests where execution in threads is not required.
+ */
+private static class MockEventProcessor implements 
CoordinatorEventProcessor {
+@Override
+public void enqueue(CoordinatorEvent event) throws 
RejectedExecutionException {
+try {
+event.run();
+} catch (Throwable ex) {
+event.complete(ex);
+}
+}
+
+@Override
+public void close() throws Exception {}
+}
+
+/**
+ * A CoordinatorLoader that always succeeds.
+ */
+private static class MockCoordinatorLoader implements 
CoordinatorLoader {
+@Override
+public CompletableFuture load(TopicPartition tp, 
CoordinatorPlayback replayable) {
+return CompletableFuture.completedFuture(null);
+}
+}
+
+/**
+ * An in-memory partition writer that accepts a maximum number of writes.
+ */
+private static class MockPartitionWriter extends 
InMemoryPartitionWriter {
+private int allowedWrites = 1;
+
+public MockPartitionWriter() {
+this(Integer.MAX_VALUE);
+}
+
+public MockPartitionWriter(int allowedWrites) {
+super(false);
+this.allowedWrites = allowedWrites;
+}
+
+@Override
+public void registerListener(TopicPartition tp, Listener listener) {
+super.registerListener(tp, listener);
+}
+
+@Override
+public void deregisterListener(TopicPartition tp, Listener listener) {
+super.deregisterListener(tp, listener);
+}
+
+@Override
+public long append(TopicPartition tp, List records) throws 
KafkaException {
+if (allowedWrites-- > 0) {
+return super.append(tp, records);
+} else {
+throw new KafkaException("append failed.");
+}
+}
+}
+
+/**
+ * A simple Coordinator implementation that stores the records into a set.
+ */
+private static class MockCoordinator implements Coordinator {
+private final TimelineHashSet records;
+
+MockCoordinator(
+SnapshotRegistry snapshotRegistry
+) {
+records = new TimelineHashSet<>(snapshotRegistry,

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1222148781


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CoordinatorRuntimeTest {
+private static final TopicPartition TP = new 
TopicPartition("__consumer_offsets", 0);
+
+/**
+ * An CoordinatorEventProcessor that directly executes the operations. 
This is
+ * useful in unit tests where execution in threads is not required.
+ */
+private static class MockEventProcessor implements 
CoordinatorEventProcessor {
+@Override
+public void enqueue(CoordinatorEvent event) throws 
RejectedExecutionException {
+try {
+event.run();
+} catch (Throwable ex) {
+event.complete(ex);
+}
+}
+
+@Override
+public void close() throws Exception {}
+}
+
+/**
+ * A CoordinatorLoader that always succeeds.
+ */
+private static class MockCoordinatorLoader implements 
CoordinatorLoader {
+@Override
+public CompletableFuture load(TopicPartition tp, 
CoordinatorPlayback replayable) {
+return CompletableFuture.completedFuture(null);
+}
+}
+
+/**
+ * An in-memory partition writer that accepts a maximum number of writes.
+ */
+private static class MockPartitionWriter extends 
InMemoryPartitionWriter {
+private int allowedWrites = 1;
+
+public MockPartitionWriter() {
+this(Integer.MAX_VALUE);
+}
+
+public MockPartitionWriter(int allowedWrites) {
+super(false);
+this.allowedWrites = allowedWrites;
+}
+
+@Override
+public void registerListener(TopicPartition tp, Listener listener) {
+super.registerListener(tp, listener);
+}
+
+@Override
+public void deregisterListener(TopicPartition tp, Listener listener) {
+super.deregisterListener(tp, listener);
+}
+
+@Override
+public long append(TopicPartition tp, List records) throws 
KafkaException {
+if (allowedWrites-- > 0) {
+return super.append(tp, records);
+} else {
+throw new KafkaException("append failed.");
+}
+}
+}
+
+/**
+ * A simple Coordinator implementation that stores the records into a set.
+ */
+private static class MockCoordinator implements Coordinator {
+private final TimelineHashSet records;
+
+MockCoordinator(
+SnapshotRegistry snapshotRegistry
+) {
+records = new TimelineHashSet<>(snapshotRegistry,

[GitHub] [kafka] junrao commented on a diff in pull request #13815: KAFKA-14966: Extract OffsetFetcher reusable logic

2023-06-07 Thread via GitHub


junrao commented on code in PR #13815:
URL: https://github.com/apache/kafka/pull/13815#discussion_r1222142092


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Utility functions for fetching offsets, validating and resetting positions.
+ */
+class OffsetFetcherUtils {
+private final ConsumerMetadata metadata;
+private final SubscriptionState subscriptionState;
+private final Time time;
+private final ApiVersions apiVersions;
+private final Logger log;
+
+private final AtomicReference 
cachedOffsetForLeaderException = new AtomicReference<>();
+private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+OffsetFetcherUtils(LogContext logContext,
+   ConsumerMetadata metadata,
+   SubscriptionState subscriptionState,
+   Time time,
+   ApiVersions apiVersions) {
+this.log = logContext.logger(getClass());
+this.metadata = metadata;
+this.subscriptionState = subscriptionState;
+this.time = time;
+this.apiVersions = apiVersions;
+}
+
+/**
+ * Callback for the response of the list offset call.
+ *
+ * @param listOffsetsResponse The response from the server.
+ * @return {@link OffsetFetcherUtils.ListOffsetResult} extracted from the 
response, containing the fetched offsets
+ * and partitions to retry.
+ */
+OffsetFetcherUtils.ListOffsetResult 
handleListOffsetResponse(ListOffsetsResponse listOffsetsResponse) {
+Map fetchedOffsets 
= new HashMap<>();
+Set partitionsToRetry = new HashSet<>();
+Set unauthorizedTopics = new HashSet<>();
+
+for (ListOffsetsResponseData.ListOffsetsTopicResponse topic : 
listOffsetsResponse.topics()) {
+for (ListOffsetsResponseData.ListOffsetsPartitionResponse 
partition : topic.partitions()) {
+TopicPartition topicPartition = new 
TopicPartition(topic.name(), partition.partitionIndex());
+Errors error = Errors.forCode(partition.errorCode());
+switch (error) {
+case NONE:
+if (!partition.oldStyleOffsets().isEmpty()) {
+// Handle v0 response with offsets
+long offset;
+if (partition.oldStyleOffsets().size() > 1) {
+throw new IllegalStateException("Unexpected 
partitionData response of length " +
+partition.oldStyleOffsets().size());
+} else {
+offset = partition.oldStyleOffsets().get(0);
+}
+log.debug("Handling v0 ListOffsetResponse response 
for {}. Fetched offset {}",
+topicPartition, offset);
+if (offset != ListOffsetsResponse

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1222140560


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CoordinatorRuntimeTest {
+private static final TopicPartition TP = new 
TopicPartition("__consumer_offsets", 0);
+
+/**
+ * An CoordinatorEventProcessor that directly executes the operations. 
This is
+ * useful in unit tests where execution in threads is not required.
+ */
+private static class MockEventProcessor implements 
CoordinatorEventProcessor {
+@Override
+public void enqueue(CoordinatorEvent event) throws 
RejectedExecutionException {
+try {
+event.run();
+} catch (Throwable ex) {
+event.complete(ex);
+}
+}
+
+@Override
+public void close() throws Exception {}
+}
+
+/**
+ * A CoordinatorLoader that always succeeds.
+ */
+private static class MockCoordinatorLoader implements 
CoordinatorLoader {
+@Override
+public CompletableFuture load(TopicPartition tp, 
CoordinatorPlayback replayable) {
+return CompletableFuture.completedFuture(null);
+}
+}
+
+/**
+ * An in-memory partition writer that accepts a maximum number of writes.
+ */
+private static class MockPartitionWriter extends 
InMemoryPartitionWriter {
+private int allowedWrites = 1;
+
+public MockPartitionWriter() {
+this(Integer.MAX_VALUE);
+}
+
+public MockPartitionWriter(int allowedWrites) {
+super(false);
+this.allowedWrites = allowedWrites;
+}
+
+@Override
+public void registerListener(TopicPartition tp, Listener listener) {
+super.registerListener(tp, listener);
+}
+
+@Override
+public void deregisterListener(TopicPartition tp, Listener listener) {
+super.deregisterListener(tp, listener);
+}
+
+@Override
+public long append(TopicPartition tp, List records) throws 
KafkaException {
+if (allowedWrites-- > 0) {
+return super.append(tp, records);
+} else {
+throw new KafkaException("append failed.");
+}
+}
+}
+
+/**
+ * A simple Coordinator implementation that stores the records into a set.
+ */
+private static class MockCoordinator implements Coordinator {
+private final TimelineHashSet records;
+
+MockCoordinator(
+SnapshotRegistry snapshotRegistry
+) {
+records = new TimelineHashSet<>(snapshotRegistry,

[GitHub] [kafka] jlprat commented on pull request #13824: MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde

2023-06-07 Thread via GitHub


jlprat commented on PR #13824:
URL: https://github.com/apache/kafka/pull/13824#issuecomment-1581463300

   Tests failures seem all unrelated and I created the corresponding Jira 
tickets for the ones I couldn't fine an already existing one:
   https://issues.apache.org/jira/browse/KAFKA-15070, 
https://issues.apache.org/jira/browse/KAFKA-15071 and 
https://issues.apache.org/jira/browse/KAFKA-15072


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15072) Flaky test MirrorConnectorsIntegrationExactlyOnceTest.testReplicationWithEmptyPartition

2023-06-07 Thread Josep Prat (Jira)
Josep Prat created KAFKA-15072:
--

 Summary: Flaky test 
MirrorConnectorsIntegrationExactlyOnceTest.testReplicationWithEmptyPartition
 Key: KAFKA-15072
 URL: https://issues.apache.org/jira/browse/KAFKA-15072
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.5.0
Reporter: Josep Prat


Test 
MirrorConnectorsIntegrationExactlyOnceTest.testReplicationWithEmptyPartition 
became flaky again, but it's a different error this time.

Occurrence: 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testReplicationWithEmptyPartition__/]

 
h3. Error Message
{code:java}
java.lang.AssertionError: Connector MirrorHeartbeatConnector tasks did not 
start in time on cluster: backup-connect-cluster{code}
h3. Stacktrace
{code:java}
java.lang.AssertionError: Connector MirrorHeartbeatConnector tasks did not 
start in time on cluster: backup-connect-cluster at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:301)
 at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:912)
 at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicationWithEmptyPartition(MirrorConnectorsIntegrationBaseTest.java:415)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
 at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeT

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1222117410


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CoordinatorRuntimeTest {
+private static final TopicPartition TP = new 
TopicPartition("__consumer_offsets", 0);
+
+/**
+ * An CoordinatorEventProcessor that directly executes the operations. 
This is
+ * useful in unit tests where execution in threads is not required.
+ */
+private static class MockEventProcessor implements 
CoordinatorEventProcessor {
+@Override
+public void enqueue(CoordinatorEvent event) throws 
RejectedExecutionException {
+try {
+event.run();
+} catch (Throwable ex) {
+event.complete(ex);
+}
+}
+
+@Override
+public void close() throws Exception {}
+}
+
+/**
+ * A CoordinatorLoader that always succeeds.
+ */
+private static class MockCoordinatorLoader implements 
CoordinatorLoader {
+@Override
+public CompletableFuture load(TopicPartition tp, 
CoordinatorPlayback replayable) {
+return CompletableFuture.completedFuture(null);
+}
+}
+
+/**
+ * An in-memory partition writer that accepts a maximum number of writes.
+ */
+private static class MockPartitionWriter extends 
InMemoryPartitionWriter {
+private int allowedWrites = 1;

Review Comment:
   nit: we don't need to set 1 right? It will not be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15071) Flaky test kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection for Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT

2023-06-07 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat updated KAFKA-15071:
---
Component/s: core

> Flaky test kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection 
> for Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT
> --
>
> Key: KAFKA-15071
> URL: https://issues.apache.org/jira/browse/KAFKA-15071
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Josep Prat
>Priority: Major
>
> Test became 
> kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection flaky 
> again but failing because of different reason. In this case it might be a 
> missing cleanup
> The values of the parameters are Type=ZK, MetadataVersion=3.5-IV2, 
> Security=PLAINTEXT
> Related to https://issues.apache.org/jira/browse/KAFKA-13737
> Ocurred: 
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/kafka.admin/LeaderElectionCommandTest/Build___JDK_8_and_Scala_2_123__Type_ZK__Name_testPreferredReplicaElection__MetadataVersion_3_5_IV2__Security_PLAINTEXT/
> h3. Error Message
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists.{code}
> h3. Stacktrace
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists.{code}
> {{ }}
> h3. Standard Output
> {code:java}
> Successfully completed leader election (UNCLEAN) for partitions 
> unclean-topic-0 [2023-06-07 14:42:33,845] ERROR [QuorumController id=3000] 
> writeNoOpRecord: unable to start processing because of 
> RejectedExecutionException. Reason: The event queue is shutting down 
> (org.apache.kafka.controller.QuorumController:467) [2023-06-07 14:42:42,699] 
> WARN [AdminClient clientId=adminclient-65] Connection to node -2 
> (localhost/127.0.0.1:35103) could not be established. Broker may not be 
> available. (org.apache.kafka.clients.NetworkClient:814) Successfully 
> completed leader election (UNCLEAN) for partitions unclean-topic-0 
> [2023-06-07 14:42:44,416] ERROR [QuorumController id=0] writeNoOpRecord: 
> unable to start processing because of RejectedExecutionException. Reason: The 
> event queue is shutting down 
> (org.apache.kafka.controller.QuorumController:467) [2023-06-07 14:42:44,716] 
> WARN maxCnxns is not configured, using default value 0. 
> (org.apache.zookeeper.server.ServerCnxnFactory:309) [2023-06-07 14:42:44,765] 
> WARN No meta.properties file under dir 
> /tmp/kafka-2117748934951771120/meta.properties 
> (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:44,986] WARN No 
> meta.properties file under dir /tmp/kafka-5133306871105583937/meta.properties 
> (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:45,214] WARN No 
> meta.properties file under dir /tmp/kafka-8449809620400833553/meta.properties 
> (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:45,634] WARN 
> [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Received 
> UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-0. This 
> error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:45,634] WARN 
> [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Received 
> UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-4. This 
> error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:45,872] WARN 
> [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Received 
> UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-1. This 
> error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:46,010] WARN 
> [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error in response for 
> fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, 
> maxBytes=10485760, 
> fetchData={__consumer_offsets-3=PartitionData(topicId=vAlEsYVbTFClcpnVRp3AOw, 
> fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 2 
> was disconnected before the response was read at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
>  at 
> kafka.server.

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1222115234


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -39,7 +40,7 @@ public class MultiThreadedEventProcessor implements 
CoordinatorEventProcessor {
 /**
  * The accumulator.
  */
-private final EventAccumulator accumulator;

Review Comment:
   Is this changed from integer to topic partition so that we can use different 
coordinator state partitions (ie consumer offsets vs transactional state)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15071) Flaky test kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection for Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT

2023-06-07 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat updated KAFKA-15071:
---
Affects Version/s: 3.5.0

> Flaky test kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection 
> for Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT
> --
>
> Key: KAFKA-15071
> URL: https://issues.apache.org/jira/browse/KAFKA-15071
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0
>Reporter: Josep Prat
>Priority: Major
>
> Test became 
> kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection flaky 
> again but failing because of different reason. In this case it might be a 
> missing cleanup
> The values of the parameters are Type=ZK, MetadataVersion=3.5-IV2, 
> Security=PLAINTEXT
> Related to https://issues.apache.org/jira/browse/KAFKA-13737
> Ocurred: 
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/kafka.admin/LeaderElectionCommandTest/Build___JDK_8_and_Scala_2_123__Type_ZK__Name_testPreferredReplicaElection__MetadataVersion_3_5_IV2__Security_PLAINTEXT/
> h3. Error Message
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists.{code}
> h3. Stacktrace
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists.{code}
> {{ }}
> h3. Standard Output
> {code:java}
> Successfully completed leader election (UNCLEAN) for partitions 
> unclean-topic-0 [2023-06-07 14:42:33,845] ERROR [QuorumController id=3000] 
> writeNoOpRecord: unable to start processing because of 
> RejectedExecutionException. Reason: The event queue is shutting down 
> (org.apache.kafka.controller.QuorumController:467) [2023-06-07 14:42:42,699] 
> WARN [AdminClient clientId=adminclient-65] Connection to node -2 
> (localhost/127.0.0.1:35103) could not be established. Broker may not be 
> available. (org.apache.kafka.clients.NetworkClient:814) Successfully 
> completed leader election (UNCLEAN) for partitions unclean-topic-0 
> [2023-06-07 14:42:44,416] ERROR [QuorumController id=0] writeNoOpRecord: 
> unable to start processing because of RejectedExecutionException. Reason: The 
> event queue is shutting down 
> (org.apache.kafka.controller.QuorumController:467) [2023-06-07 14:42:44,716] 
> WARN maxCnxns is not configured, using default value 0. 
> (org.apache.zookeeper.server.ServerCnxnFactory:309) [2023-06-07 14:42:44,765] 
> WARN No meta.properties file under dir 
> /tmp/kafka-2117748934951771120/meta.properties 
> (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:44,986] WARN No 
> meta.properties file under dir /tmp/kafka-5133306871105583937/meta.properties 
> (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:45,214] WARN No 
> meta.properties file under dir /tmp/kafka-8449809620400833553/meta.properties 
> (kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:45,634] WARN 
> [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Received 
> UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-0. This 
> error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:45,634] WARN 
> [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Received 
> UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-4. This 
> error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:45,872] WARN 
> [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Received 
> UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-1. This 
> error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:46,010] WARN 
> [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error in response for 
> fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, 
> maxBytes=10485760, 
> fetchData={__consumer_offsets-3=PartitionData(topicId=vAlEsYVbTFClcpnVRp3AOw, 
> fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 2 
> was disconnected before the response was read at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
>  at 
> kafka.server.BrokerBlockingSender.

[jira] [Commented] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs

2023-06-07 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730272#comment-17730272
 ] 

Josep Prat commented on KAFKA-14971:


Another occurrence:

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_11_and_Scala_2_13___testSyncTopicConfigs__/

> Flaky Test 
> org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs
> --
>
> Key: KAFKA-14971
> URL: https://issues.apache.org/jira/browse/KAFKA-14971
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Priority: Major
>  Labels: flaky-test, mirror-maker
>
> The test testSyncTopicConfigs in 
> `org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs`
>  seems to be flaky. Found here : 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests]
> Ran on local against the [same PR  
> |https://github.com/apache/kafka/pull/13594]and  it passed.
>  
>  
> {code:java}
> org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
> because it's explicitly defined on the target topic! ==> expected: <2000> but 
> was: <8640>
> Stacktrace
> org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
> because it's explicitly defined on the target topic! ==> expected: <2000> but 
> was: <8640>
> at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
> at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
> at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153)
> at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758)
> at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325)
> at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373)
> at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322)
> at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306)
> at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296)
> at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752)
> at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
> at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
> at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
> at 
> app//org.junit.jupiter.engine.e

[GitHub] [kafka] junrao commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()

2023-06-07 Thread via GitHub


junrao commented on code in PR #13797:
URL: https://github.com/apache/kafka/pull/13797#discussion_r1221908399


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -106,4 +114,17 @@ private boolean process(final OffsetFetchApplicationEvent 
event) {
 manager.addOffsetFetchRequest(event.partitions);
 return true;
 }
+
+private boolean process(final MetadataUpdateApplicationEvent event) {
+metadata.requestUpdateForNewTopics();

Review Comment:
   MetadataUpdateApplicationEvent is for for new topics. Could we name it 
clearer? Will there be a separate event for full metadata update?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -522,7 +525,35 @@ public void subscribe(Collection topics, 
ConsumerRebalanceListener callb
 
 @Override
 public void assign(Collection partitions) {
-throw new KafkaException("method not implemented");
+if (partitions == null) {
+throw new IllegalArgumentException("Topic partitions collection to 
assign to cannot be null");
+}
+
+if (partitions.isEmpty()) {
+this.unsubscribe();
+return;
+}
+
+for (TopicPartition tp : partitions) {
+String topic = (tp != null) ? tp.topic() : null;
+if (Utils.isBlank(topic))
+throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
+}
+// TODO: implement fetcher

Review Comment:
   Is the TODO still needed?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -522,7 +525,35 @@ public void subscribe(Collection topics, 
ConsumerRebalanceListener callb
 
 @Override
 public void assign(Collection partitions) {
-throw new KafkaException("method not implemented");
+if (partitions == null) {
+throw new IllegalArgumentException("Topic partitions collection to 
assign to cannot be null");
+}
+
+if (partitions.isEmpty()) {
+this.unsubscribe();
+return;
+}
+
+for (TopicPartition tp : partitions) {
+String topic = (tp != null) ? tp.topic() : null;
+if (Utils.isBlank(topic))
+throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
+}
+// TODO: implement fetcher
+// fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+
+// make sure the offsets of topic partitions the consumer is 
unsubscribing from
+// are committed since there will be no following rebalance
+commit(subscriptions.allConsumed());
+
+log.info("Assigned to partition(s): {}", Utils.join(partitions, ", "));
+if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
+   updateMetadata(time.milliseconds());

Review Comment:
   The existing consumer seems to only update the metadata for new topics. 
`updateMetadata` doesn't make it clear that it's for new topics. Could we make 
it clear?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataUpdateApplicationEvent.java:
##
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+public class MetadataUpdateApplicationEvent extends ApplicationEvent {
+
+private final long timestamp;

Review Comment:
   `timestamp` seems unused?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -537,7 +568,9 @@ public void subscribe(Pattern pattern) {
 
 @Override
 public void unsubscribe() {
-throw new KafkaException("method not implemented");
+// 
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());

Review Comment:
   Is this still needed?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java:
##
@@ -166,7 +208,7

[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-07 Thread via GitHub


kirktrue commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581446668

   > ThreadLocal variables are not as frequently used in Kafka, but I can see a 
potential argument for usage here. One tricky part about thread locals is 
testing. Any ideas on how we could do this? Would we need to spin up a thread 
to assign the different boolean values?
   
   No, the test method would need to specifically mark its thread as wanting to 
poison the thread or not before running the rest of the test. In the 
`TransactionManagerTest`, there was only one place I had to specifically call 
it to get the tests to pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15020) integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor test is flaky

2023-06-07 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730271#comment-17730271
 ] 

Josep Prat commented on KAFKA-15020:


Seen this test failing with a different error: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor__/

 
h3. Error Message
{code:java}
java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: 
Timed out while awaiting expected assignment 
Set(topicWithAllPartitionsOnAllRacks-1, topicWithSingleRackPartitions-1). The 
current assignment is []{code}
h3. Stacktrace
{code:java}
java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: 
Timed out while awaiting expected assignment 
Set(topicWithAllPartitionsOnAllRacks-1, topicWithSingleRackPartitions-1). The 
current assignment is [] at 
java.util.concurrent.FutureTask.report(FutureTask.java:122) at 
java.util.concurrent.FutureTask.get(FutureTask.java:206) at 
integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$9(FetchFromFollowerIntegrationTest.scala:211)
 at 
integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$9$adapted(FetchFromFollowerIntegrationTest.scala:211)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
integration.kafka.server.FetchFromFollowerIntegrationTest.verifyAssignments$1(FetchFromFollowerIntegrationTest.scala:211)
 at 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(FetchFromFollowerIntegrationTest.scala:251)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
 at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCol

[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-07 Thread via GitHub


jolshan commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581443881

   ThreadLocal variables are not as frequently used in Kafka, but I can see a 
potential argument for usage here. One tricky part about thread locals is 
testing. Any ideas on how we could do this? Would we need to spin up a thread 
to assign the different boolean values?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14539) Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

2023-06-07 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-14539.
-
Resolution: Fixed

> Simplify StreamsMetadataState by replacing the Cluster metadata with 
> partition info map
> ---
>
> Key: KAFKA-14539
> URL: https://issues.apache.org/jira/browse/KAFKA-14539
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Danica Fine
>Priority: Major
>
> We can clean up the StreamsMetadataState class a bit by removing the 
> #onChange invocation that currently occurs within 
> StreamsPartitionAssignor#assign, which then lets us remove the `Cluster` 
> parameter in that callback. Instead of building a fake Cluster object from 
> the map of partition info when we invoke #onChange inside the 
> StreamsPartitionAssignor#onAssignment method, we can just directly pass in 
> the  `Map` and replace the usage of `Cluster` 
> everywhere in StreamsMetadataState
> (I believe the current system is a historical artifact from when we used to 
> require passing in a {{Cluster}} for the default partitioning strategy, which 
> the StreamMetadataState needs to compute the partition for a key. At some 
> point in the past we provided a better way to get the default partition, so 
> we no longer need a {{Cluster}} parameter/field at all)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14539) Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

2023-06-07 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-14539:

Fix Version/s: 3.6.0

> Simplify StreamsMetadataState by replacing the Cluster metadata with 
> partition info map
> ---
>
> Key: KAFKA-14539
> URL: https://issues.apache.org/jira/browse/KAFKA-14539
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Danica Fine
>Priority: Major
> Fix For: 3.6.0
>
>
> We can clean up the StreamsMetadataState class a bit by removing the 
> #onChange invocation that currently occurs within 
> StreamsPartitionAssignor#assign, which then lets us remove the `Cluster` 
> parameter in that callback. Instead of building a fake Cluster object from 
> the map of partition info when we invoke #onChange inside the 
> StreamsPartitionAssignor#onAssignment method, we can just directly pass in 
> the  `Map` and replace the usage of `Cluster` 
> everywhere in StreamsMetadataState
> (I believe the current system is a historical artifact from when we used to 
> require passing in a {{Cluster}} for the default partitioning strategy, which 
> the StreamMetadataState needs to compute the partition for a key. At some 
> point in the past we provided a better way to get the default partition, so 
> we no longer need a {{Cluster}} parameter/field at all)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15071) Flaky test kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection for Type=ZK, MetadataVersion=3.5-IV2, Security=PLAINTEXT

2023-06-07 Thread Josep Prat (Jira)
Josep Prat created KAFKA-15071:
--

 Summary: Flaky test 
kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection for Type=ZK, 
MetadataVersion=3.5-IV2, Security=PLAINTEXT
 Key: KAFKA-15071
 URL: https://issues.apache.org/jira/browse/KAFKA-15071
 Project: Kafka
  Issue Type: Bug
Reporter: Josep Prat


Test became kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection 
flaky again but failing because of different reason. In this case it might be a 
missing cleanup

The values of the parameters are Type=ZK, MetadataVersion=3.5-IV2, 
Security=PLAINTEXT

Related to https://issues.apache.org/jira/browse/KAFKA-13737

Ocurred: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/kafka.admin/LeaderElectionCommandTest/Build___JDK_8_and_Scala_2_123__Type_ZK__Name_testPreferredReplicaElection__MetadataVersion_3_5_IV2__Security_PLAINTEXT/
h3. Error Message
{code:java}
org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' 
already exists.{code}
h3. Stacktrace
{code:java}
org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' 
already exists.{code}
{{ }}
h3. Standard Output
{code:java}
Successfully completed leader election (UNCLEAN) for partitions unclean-topic-0 
[2023-06-07 14:42:33,845] ERROR [QuorumController id=3000] writeNoOpRecord: 
unable to start processing because of RejectedExecutionException. Reason: The 
event queue is shutting down (org.apache.kafka.controller.QuorumController:467) 
[2023-06-07 14:42:42,699] WARN [AdminClient clientId=adminclient-65] Connection 
to node -2 (localhost/127.0.0.1:35103) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) Successfully 
completed leader election (UNCLEAN) for partitions unclean-topic-0 [2023-06-07 
14:42:44,416] ERROR [QuorumController id=0] writeNoOpRecord: unable to start 
processing because of RejectedExecutionException. Reason: The event queue is 
shutting down (org.apache.kafka.controller.QuorumController:467) [2023-06-07 
14:42:44,716] WARN maxCnxns is not configured, using default value 0. 
(org.apache.zookeeper.server.ServerCnxnFactory:309) [2023-06-07 14:42:44,765] 
WARN No meta.properties file under dir 
/tmp/kafka-2117748934951771120/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:44,986] WARN No 
meta.properties file under dir /tmp/kafka-5133306871105583937/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:45,214] WARN No 
meta.properties file under dir /tmp/kafka-8449809620400833553/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:42:45,634] WARN 
[ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-0. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:42:45,634] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] 
Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-4. 
This error may be returned transiently when the partition is being created or 
deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:42:45,872] WARN 
[ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-1. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:42:46,010] WARN [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error 
in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, 
minBytes=1, maxBytes=10485760, 
fetchData={__consumer_offsets-3=PartitionData(topicId=vAlEsYVbTFClcpnVRp3AOw, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 2 was 
disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetc

[jira] [Created] (KAFKA-15070) Flaky test kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic for codec zstd

2023-06-07 Thread Josep Prat (Jira)
Josep Prat created KAFKA-15070:
--

 Summary: Flaky test 
kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic
 for codec zstd
 Key: KAFKA-15070
 URL: https://issues.apache.org/jira/browse/KAFKA-15070
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.5.0
Reporter: Josep Prat


Flaky tests with the following traces and output:
h3. Error Message

org.opentest4j.AssertionFailedError: Timed out waiting for deletion of old 
segments
h3. Stacktrace

org.opentest4j.AssertionFailedError: Timed out waiting for deletion of old 
segments at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) 
at org.junit.jupiter.api.Assertions.fail(Assertions.java:135) at 
kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:123)

...

 
h3. Standard Output

[2023-06-07 16:03:59,974] WARN [LocalLog partition=log-0, 
dir=/tmp/kafka-6339499869249617477] Record format version has been downgraded 
from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 16:04:01,691] WARN [LocalLog 
partition=log-0, dir=/tmp/kafka-6391328203703920459] Record format version has 
been downgraded from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 
16:04:02,661] WARN [LocalLog partition=log-0, 
dir=/tmp/kafka-7107559685120209313] Record format version has been downgraded 
from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 16:04:04,449] WARN [LocalLog 
partition=log-0, dir=/tmp/kafka-2334095685379242376] Record format version has 
been downgraded from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 
16:04:12,059] ERROR [LogLoader partition=log-0, 
dir=/tmp/kafka-4306370019245327987] Could not find offset index file 
corresponding to log file 
/tmp/kafka-4306370019245327987/log-0/0300.log, recovering 
segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 
16:04:21,424] ERROR [LogLoader partition=log-0, 
dir=/tmp/kafka-8549848301585177643] Could not find offset index file 
corresponding to log file 
/tmp/kafka-8549848301585177643/log-0/0300.log, recovering 
segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 
16:04:42,679] ERROR [LogLoader partition=log-0, 
dir=/tmp/kafka-8308685679443421785] Could not find offset index file 
corresponding to log file 
/tmp/kafka-8308685679443421785/log-0/0300.log, recovering 
segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 
16:04:50,435] ERROR [LogLoader partition=log-0, 
dir=/tmp/kafka-2686097435338562303] Could not find offset index file 
corresponding to log file 
/tmp/kafka-2686097435338562303/log-0/0300.log, recovering 
segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 
16:07:16,263] WARN [LocalLog partition=log-0, 
dir=/tmp/kafka-5435804108212698551] Record format version has been downgraded 
from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 16:07:35,193] WARN [LocalLog 
partition=log-0, dir=/tmp/kafka-4310277229895025994] Record format version has 
been downgraded from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 
16:07:55,323] WARN [LocalLog partition=log-0, 
dir=/tmp/kafka-3364951894697258113] Record format version has been downgraded 
from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 16:08:16,286] WARN [LocalLog 
partition=log-0, dir=/tmp/kafka-3161518940405121110] Record format version has 
been downgraded from V2 to V0. (kafka.log.LocalLog:70) [2023-06-07 
16:35:03,765] ERROR [LogLoader partition=log-0, 
dir=/tmp/kafka-2385863108707929062] Could not find offset index file 
corresponding to log file 
/tmp/kafka-2385863108707929062/log-0/0300.log, recovering 
segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 
16:35:06,406] ERROR [LogLoader partition=log-0, 
dir=/tmp/kafka-5380450050465409057] Could not find offset index file 
corresponding to log file 
/tmp/kafka-5380450050465409057/log-0/0300.log, recovering 
segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 
16:35:09,061] ERROR [LogLoader partition=log-0, 
dir=/tmp/kafka-7510941634638265317] Could not find offset index file 
corresponding to log file 
/tmp/kafka-7510941634638265317/log-0/0300.log, recovering 
segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 
16:35:11,593] ERROR [LogLoader partition=log-0, 
dir=/tmp/kafka-7423113520781905391] Could not find offset index file 
corresponding to log file 
/tmp/kafka-7423113520781905391/log-0/0300.log, recovering 
segment and rebuilding index files... (kafka.log.LogLoader:74) [2023-06-07 
16:35:14,159] ERROR [LogLoader partition=log-0, 
dir=/tmp/kafka-2120426496175304835] Could not find offset index file 
corresponding to log file 
/tmp/kafka-212042649617530483

[jira] [Commented] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()

2023-06-07 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730264#comment-17730264
 ] 

Josep Prat commented on KAFKA-15052:


Adding another occurrence: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/

> Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()
> -
>
> Key: KAFKA-15052
> URL: https://issues.apache.org/jira/browse/KAFKA-15052
> Project: Kafka
>  Issue Type: Test
>Reporter: Dimitar Dimitrov
>Assignee: Dimitar Dimitrov
>Priority: Major
>
> Test failed at 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/]
>  as well as in various local runs.
> The test creates a topic, fences a broker, notes partition imbalance due to 
> another broker taking over the partition the fenced broker lost, re-registers 
> and unfences the fenced broker, sends {{AlterPartition}} for the lost 
> partition adding the now unfenced broker back to its ISR, then waits for the 
> partition imbalance to disappear.
> The local failures seem to happen when the brokers (including the ones that 
> never get fenced by the test) accidentally get fenced by losing their session 
> due to reaching the (aggressively low for test purposes) session timeout.
> The Cloudbees failure quoted above also seems to indicate that this happened:
> {code:java}
> ...[truncated 738209 chars]...
> 23. (org.apache.kafka.controller.QuorumController:768)
> [2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write 
> event for maybeBalancePartitionLeaders because scheduled (DEFERRED), 
> checkIntervalNs (OptionalLong[10]) and isImbalanced (true) 
> (org.apache.kafka.controller.QuorumController:1401)
> [2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 
> because its session has timed out. 
> (org.apache.kafka.controller.ReplicationControlManager:1459)
> [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: 
> changing partition(s): foo-0, foo-1, foo-2 
> (org.apache.kafka.controller.ReplicationControlManager:1750)
> [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for 
> foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 
> -> 3, partitionEpoch: 2 -> 3 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
> foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: 
> 3 -> 4, partitionEpoch: 4 -> 5 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
> foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 
> -> 3, partitionEpoch: 2 -> 3 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, 
> appendTimestamp=240, 
> records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, 
> brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), 
> prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253)
> [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory 
> snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197)
> [2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log 
> check. (org.apache.kafka.metalog.LocalLogManager:512)
> [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation 
> maybeFenceReplicas(451616131) will be completed when the log reaches offset 
> 27. (org.apache.kafka.controller.QuorumController:768)
> [2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 
> because its session has timed out. 
> (org.apache.kafka.controller.ReplicationControlManager:1459)
> [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: 
> changing partition(s): foo-1 
> (org.apache.kafka.controller.Rep

[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime

2023-06-07 Thread via GitHub


gharris1727 commented on PR #13313:
URL: https://github.com/apache/kafka/pull/13313#issuecomment-1581417136

   @ijuma Here are the results of my local test runs of test suites which use 
LATEST_0_8_2 and VerifiableProducer:
   
   * test_verifiable_producer.py: 25/25 PASS
   * compatibility_test_new_broker_test.py 44/52
   
producer_version=0.8.2.2.consumer_version=0.8.2.2.compression_types=.none.new_consumer=False.timestamp_type=None
 PASS
   
   * upgrade_test.py 51/54
   
from_kafka_version=0.8.2.2.to_message_format_version=None.compression_types=.none
 PASS 
from_kafka_version=0.8.2.2.to_message_format_version=None.compression_types=.snappy
 FAIL
   
   Looking into the snappy failure, it appears that the broker can't find the 
native snappy library, and the client is receiving an opaque 
InvocationTargetException. I think this is an environment specific problem, and 
it also appears on trunk.
   
   Are you able to start a branch build system test for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-8073) Transient failure in kafka.api.UserQuotaTest.testThrottledProducerConsumer

2023-06-07 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730263#comment-17730263
 ] 

Josep Prat commented on KAFKA-8073:
---

I can see this still failing: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/kafka.api/UserQuotaTest/Build___JDK_17_and_Scala_2_13___testThrottledProducerConsumer_String__quorum_kraft/

> Transient failure in kafka.api.UserQuotaTest.testThrottledProducerConsumer
> --
>
> Key: KAFKA-8073
> URL: https://issues.apache.org/jira/browse/KAFKA-8073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Bill Bejeck
>Priority: Critical
>
> Failed in build [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20134/]
>  
> Stacktrace and STDOUT
> {noformat}
> Error Message
> java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have 
> been throttled
> Stacktrace
> java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have 
> been throttled
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> kafka.api.QuotaTestClients.verifyThrottleTimeMetric(BaseQuotaTest.scala:229)
>   at 
> kafka.api.QuotaTestClients.verifyProduceThrottle(BaseQuotaTest.scala:215)
>   at 
> kafka.api.BaseQuotaTest.testThrottledProducerConsumer(BaseQuotaTest.scala:82)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java

[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

2023-06-07 Thread via GitHub


bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1581399670

   merged #13751 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck merged pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

2023-06-07 Thread via GitHub


bbejeck merged PR #13751:
URL: https://github.com/apache/kafka/pull/13751


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-07 Thread via GitHub


kirktrue commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581369789

   @jolshan here's the basic idea:
   
   ```diff
   diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
   index 19f4c54b65..636c7907fa 100644
   --- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
   +++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
   @@ -240,6 +240,9 @@ public class Sender implements Runnable {
public void run() {
log.debug("Starting Kafka producer I/O thread.");

   +if (transactionManager != null)
   +transactionManager.setPoisonStateOnInvalidTransition(true);
   +
// main loop, runs until close is called
while (running) {
try {
   diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
   index 02f689da68..aa6de49aef 100644
   --- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
   +++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
   @@ -122,6 +122,7 @@ public class TransactionManager {
private final Set newPartitionsInTransaction;
private final Set pendingPartitionsInTransaction;
private final Set partitionsInTransaction;
   +private final ThreadLocal shouldPoisonStateOnInvalidTransition;
private PendingStateTransition pendingTransition;

// This is used by the TxnRequestHandlers to control how long to back 
off before a given request is retried.
   @@ -278,6 +279,7 @@ public class TransactionManager {
this.newPartitionsInTransaction = new HashSet<>();
this.pendingPartitionsInTransaction = new HashSet<>();
this.partitionsInTransaction = new HashSet<>();
   +this.shouldPoisonStateOnInvalidTransition = 
ThreadLocal.withInitial(() -> false);
this.pendingRequests = new PriorityQueue<>(10, 
Comparator.comparingInt(o -> o.priority().priority));
this.pendingTxnOffsetCommits = new HashMap<>();
this.partitionsWithUnresolvedSequences = new HashMap<>();
   @@ -287,6 +289,10 @@ public class TransactionManager {
this.apiVersions = apiVersions;
}

   +void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
   +shouldPoisonStateOnInvalidTransition.set(shouldPoisonState);
   +}
   +
public synchronized TransactionalRequestResult initializeTransactions() 
{
return initializeTransactions(ProducerIdAndEpoch.NONE, 
CallingThread.APPLICATION);
}
   @@ -1068,7 +1074,7 @@ public class TransactionManager {
String message = idString + "Invalid transition attempted from 
state "
+ currentState.name() + " to state " + target.name();

   -if (callingThread.shouldPoisonState()) {
   +if (shouldPoisonStateOnInvalidTransition.get()) {
currentState = State.FATAL_ERROR;
lastError = new IllegalStateException(message);
throw lastError;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-07 Thread via GitHub


kirktrue commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581356873

   @jolshan The more I look at the code, the more I dislike it. Having the 
extra `CallingThread` parameter to distinguish application threads from the 
`Sender` thread is just... ugly.
   
   What do you think about storing a `Thread` reference or creating a 
`ThreadLocalData` in `TransactionManager`? That way the `Sender` can inform 
`TransactionManager` which thread it's using and we can eliminate all of that 
noise.
   
   What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1222035271


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java:
##
@@ -16,21 +16,23 @@
  */
 package org.apache.kafka.coordinator.group.runtime;
 
+import org.apache.kafka.common.TopicPartition;
+
 /**
  * The base event type used by all events processed in the
  * coordinator runtime.
  */
-public interface CoordinatorEvent extends EventAccumulator.Event {
+public interface CoordinatorEvent extends 
EventAccumulator.Event {
 
 /**
- * Runs the event.
+ * Executes the event.
  */
 void run();
 
 /**
  * Completes the event with the provided exception.
  *
- * @param exception An exception to complete the event with.
+ * @param exception An exception if the processing of the event failed or 
null.

Review Comment:
   nit: maybe "An exception if the processing of the event failed or null 
otherwise"
   I read this as exception if the event failed or was null. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-07 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1222035054


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -977,18 +1058,20 @@ void handleCoordinatorReady() {
 initProducerIdVersion.maxVersion() >= 3;
 }
 
-private void transitionTo(State target) {

Review Comment:
   I overloaded `transitionTo` so as to let callers with `null` use the more 
concise version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-07 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1222034380


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -471,9 +547,9 @@ synchronized void 
requestEpochBumpForPartition(TopicPartition tp) {
 this.partitionsToRewriteSequences.add(tp);
 }
 
-private void bumpIdempotentProducerEpoch() {
+private void 
bumpIdempotentProducerEpoch(@SuppressWarnings("SameParameterValue") 
CallingThread callingThread) {

Review Comment:
   Strictly speaking, it isn't needed, so I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1222033908


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.deferred.DeferredEvent;
+import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The CoordinatorRuntime provides a framework to implement coordinators such 
as the group coordinator
+ * or the transaction coordinator.
+ *
+ * The runtime framework maps each underlying partitions (e.g. 
__consumer_offsets) that that broker is a
+ * leader of to a coordinator replicated state machine. A replicated state 
machine holds the hard and soft
+ * state of all the objects (e.g. groups or offsets) assigned to the 
partition. The hard state is stored in
+ * timeline datastructures backed by a SnapshotRegistry. The runtime supports 
two type of operations
+ * on state machines: (1) Writes and (2) Reads.
+ *
+ * (1) A write operation, aka a request, can read the full and potentially 
**uncommitted** state from state
+ * machine to handle the operation. A write operation typically generates a 
response and a list of
+ * records. The records are applies to the state machine and persisted to the 
partition. The response
+ * is parked until the records are committed and delivered when they are.
+ *
+ * (2) A read operation, aka a request, can only read the committed state from 
the state machine to handle
+ * the operation. A read operation typically generates a response that is 
immediately completed.
+ *
+ * The runtime framework exposes an asynchronous, future based, API to the 
world. All the operations
+ * are executed by an CoordinatorEventProcessor. The processor guarantees that 
operations for a
+ * single partition or state machine are not processed concurrently.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public class CoordinatorRuntime, U> {
+
+/**
+ * Builder to create a CoordinatorRuntime.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public static class Builder, U> {
+private LogContext logContext;
+private CoordinatorEventProcessor eventProcessor;
+private PartitionWriter partitionWriter;
+private CoordinatorLoader loader;
+private CoordinatorBuilderSupplier coordinatorBuilderSupplier;
+
+public Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+public Builder withEventProcessor(CoordinatorEventProcessor 
eventProcessor) {
+this.eventProcessor = eventProcessor;
+return this;
+}
+
+public Builder withPartitionWriter(PartitionWriter 
partitionWriter) {
+this.partitionWriter = partitionWriter;
+return this;
+}
+
+public Builder withLoader(CoordinatorLoader loader) {
+this.loader = loader;
+return this;
+}
+
+public Builder 
withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier 
coordinatorBuilderSupplier) {
+this.coordinatorBuilderSupplier = coordinatorBuilderSupplier;
+return this;
+}
+
+public CoordinatorRuntime build() {
+if (logContext == null)
+logContext = new LogContext();
+if (eventProcessor == null)
+throw new IllegalA

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1222029897


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -0,0 +1,1040 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.deferred.DeferredEvent;
+import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The CoordinatorRuntime provides a framework to implement coordinators such 
as the group coordinator
+ * or the transaction coordinator.
+ *
+ * The runtime framework maps each underlying partitions (e.g. 
__consumer_offsets) that that broker is a
+ * leader of to a coordinator replicated state machine. A replicated state 
machine holds the hard and soft
+ * state of all the objects (e.g. groups or offsets) assigned to the 
partition. The hard state is stored in
+ * timeline datastructures backed by a SnapshotRegistry. The runtime supports 
two type of operations
+ * on state machines: (1) Writes and (2) Reads.
+ *
+ * (1) A write operation, aka a request, can read the full and potentially 
**uncommitted** state from state
+ * machine to handle the operation. A write operation typically generates a 
response and a list of
+ * records. The records are applies to the state machine and persisted to the 
partition. The response
+ * is parked until the records are committed and delivered when they are.
+ *
+ * (2) A read operation, aka a request, can only read the committed state from 
the state machine to handle
+ * the operation. A read operation typically generates a response that is 
immediately completed.
+ *
+ * The runtime framework exposes an asynchronous, future based, API to the 
world. All the operations
+ * are executed by an CoordinatorEventProcessor. The processor guarantees that 
operations for a
+ * single partition or state machine are not processed concurrently.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public class CoordinatorRuntime, U> {
+
+/**
+ * Builder to create a CoordinatorRuntime.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public static class Builder, U> {
+private LogContext logContext;
+private CoordinatorEventProcessor eventProcessor;
+private PartitionWriter partitionWriter;
+private CoordinatorLoader loader;
+private CoordinatorBuilderSupplier coordinatorBuilderSupplier;
+
+public Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+public Builder withEventProcessor(CoordinatorEventProcessor 
eventProcessor) {
+this.eventProcessor = eventProcessor;
+return this;
+}
+
+public Builder withPartitionWriter(PartitionWriter 
partitionWriter) {
+this.partitionWriter = partitionWriter;
+return this;
+}
+
+public Builder withLoader(CoordinatorLoader loader) {
+this.loader = loader;
+return this;
+}
+
+public Builder 
withCoordinatorBuilderSupplier(CoordinatorBuilderSupplier 
coordinatorBuilderSupplier) {
+this.coordinatorBuilderSupplier = coordinatorBuilderSupplier;
+return this;
+}
+
+public CoordinatorRuntime build() {
+if (logContext == null)
+logContext = new LogContext();
+if (eventProcessor == null)
+throw new IllegalArgume

[GitHub] [kafka] mumrah commented on a diff in pull request #13823: MINOR: Move MockTime to server-common

2023-06-07 Thread via GitHub


mumrah commented on code in PR #13823:
URL: https://github.com/apache/kafka/pull/13823#discussion_r1222020155


##
server-common/src/test/java/org/apache/kafka/server/util/MockTime.java:
##
@@ -27,15 +23,21 @@
  * 1. This has an associated scheduler instance for managing background tasks 
in a deterministic way.
  * 2. This doesn't support the `auto-tick` functionality as it interacts badly 
with the current implementation of `MockScheduler`.
  */
-class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends 
JMockTime(0, currentTimeMs, currentHiResTimeNs) {
-
-  def this() = this(System.currentTimeMillis(), System.nanoTime())
+public class MockTime extends org.apache.kafka.common.utils.MockTime {
+public final MockScheduler scheduler;

Review Comment:
   Can we make this private and add an accessor? Or would that break a bunch of 
usages? I think Scala usages might be ok with the a no-parens method call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-07 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1222019325


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -609,14 +686,15 @@ public synchronized void 
handleCompletedBatch(ProducerBatch batch, ProduceRespon
 }
 
 public synchronized void transitionToUninitialized(RuntimeException 
exception) {
-transitionTo(State.UNINITIALIZED);
+transitionTo(State.UNINITIALIZED, exception, 
InvalidStateDetectionStrategy.BACKGROUND);

Review Comment:
   Correct. The exception is stored in `lastError`, mostly for the next call to 
`maybeFailWithError`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-07 Thread via GitHub


dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221998443


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 result
   }
 
+  /**
+   * Maybe create and return the verification guard object for the given 
producer ID if the transaction is not yet ongoing.
+   * Creation starts the verification process. Otherwise return null.
+   */
+  def maybeStartTransactionVerification(producerId: Long): Object = lock 
synchronized {
+if (hasOngoingTransaction(producerId))
+  null
+else
+  verificationGuard(producerId, true)
+  }
+
+  /**
+   * Maybe create the VerificationStateEntry for the given producer ID -- if 
an entry is present, return its verification guard, otherwise, return null.
+   */
+  def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): 
Object = lock synchronized {

Review Comment:
   Yeah, keeping it makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah closed pull request #13825: DO NOT MERGE: Testing a Github action

2023-06-07 Thread via GitHub


mumrah closed pull request #13825: DO NOT MERGE: Testing a Github action
URL: https://github.com/apache/kafka/pull/13825


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221986657


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 result
   }
 
+  /**
+   * Maybe create and return the verification guard object for the given 
producer ID if the transaction is not yet ongoing.
+   * Creation starts the verification process. Otherwise return null.
+   */
+  def maybeStartTransactionVerification(producerId: Long): Object = lock 
synchronized {
+if (hasOngoingTransaction(producerId))
+  null
+else
+  verificationGuard(producerId, true)
+  }
+
+  /**
+   * Maybe create the VerificationStateEntry for the given producer ID -- if 
an entry is present, return its verification guard, otherwise, return null.
+   */
+  def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): 
Object = lock synchronized {

Review Comment:
   Hmmm...
   ```
   
/Users/justineolshan/kafka/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:2792:56:
 method getOrMaybeCreateVerificationGuard in class UnifiedLog cannot be 
accessed as a member of kafka.log.UnifiedLog from class ReplicaManagerTest in 
package server
   ```
   
   I can potentially refactor this to not check verification guard in these 
tests, but I do think it is useful. What do you think about leaving as not 
package private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221910510


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -980,6 +1006,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+
+  // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.
+  // This guarantees that transactional records are never written to 
the log outside of the transaction coordinator's knowledge of an open 
transaction on
+  // the partition. If we do not have an ongoing transaction or 
correct guard, return an error and do not append.
+  // There are two phases -- the first append to the log and 
subsequent appends.
+  //
+  // 1. First append: Verification starts with creating a verification 
guard object, sending a verification request to the transaction coordinator, and
+  // given a "verified" response, continuing the append path. (A 
non-verified response throws an error.) We create the unique verification guard 
for the transaction
+  // to ensure there is no race between the transaction coordinator 
response and an abort marker getting written to the log. We need a unique guard 
because we could
+  // have a sequence of events where we start a transaction 
verification, have the transaction coordinator send a verified response, write 
an abort marker,
+  // start a new transaction not aware of the partition, and receive 
the stale verification (ABA problem). With a unique verification guard object, 
this sequence would not
+  // result in appending to the log and would return an error. The 
guard is removed after the first append to the transaction and from then, we 
can rely on phase 2.
+  //
+  // 2. Subsequent appends: Once we write to the transaction, the 
in-memory state currentTxnFirstOffset is populated. This field remains until the
+  // transaction is completed or aborted. We can guarantee the 
transaction coordinator knows about the transaction given step 1 and that the 
transaction is still
+  // ongoing. If the transaction is expected to be ongoing, we will 
not set a verification guard. If the transaction is aborted, 
hasOngoingTransaction is false and
+  // requestVerificationGuard is null, so we will throw an error. A 
subsequent produce request (retry) should create verification state and return 
to phase 1.
+  if (batch.isTransactional && 
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
+if (!hasOngoingTransaction(batch.producerId) && 
(requestVerificationGuard != verificationGuard(batch.producerId) || 
requestVerificationGuard == null))

Review Comment:
   This is a hold over from the previous pr that also included tentative state. 
Since we may move that, I can make this all one. (But I may make it a helper 
since this is quite a lot of logic.) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

2023-06-07 Thread via GitHub


wcarlson5 commented on code in PR #13751:
URL: https://github.com/apache/kafka/pull/13751#discussion_r1221950345


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##
@@ -308,12 +308,17 @@ public synchronized  KeyQueryMetadata 
getKeyQueryMetadataForKey(final String
  *
  * @param activePartitionHostMap  the current mapping of {@link HostInfo} 
-> {@link TopicPartition}s for active partitions
  * @param standbyPartitionHostMap the current mapping of {@link HostInfo} 
-> {@link TopicPartition}s for standby partitions
- * @param clusterMetadata the current clusterMetadata {@link 
Cluster}
+ * @param topicPartitionInfo  the current mapping of {@link 
TopicPartition} -> {@Link PartitionInfo}
  */
 synchronized void onChange(final Map> 
activePartitionHostMap,
final Map> 
standbyPartitionHostMap,
-   final Cluster clusterMetadata) {
-this.clusterMetadata = clusterMetadata;
+   final Map 
topicPartitionInfo) {
+this.partitionsByTopic = new HashMap<>();
+topicPartitionInfo.entrySet().forEach(entry -> this.partitionsByTopic

Review Comment:
   Can we format this so it is a bit easier to read? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #13530: KAFKA-14858: Handle exceptions thrown from Connector::taskConfigs in Connect's standalone mode

2023-06-07 Thread via GitHub


yashmayya commented on PR #13530:
URL: https://github.com/apache/kafka/pull/13530#issuecomment-1581235501

   > which is oddly not showing up on the GitHub PR UI, although it does show 
up on my fork's branch which this PR is created from
   
   https://www.githubstatus.com/incidents/1g1gkh0qpyvs
   
   Looks like GitHub is having its zillionth incident of the year 🤦‍♂️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-07 Thread via GitHub


yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1221937078


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1587,10 +1595,11 @@ private boolean alterConnectorOffsetsChecks(String 
connName, Callback c
 // If the target state for the connector is stopped, its task count is 
0, and there is no rebalance pending (checked above),
 // we can be sure that the tasks have at least been attempted to be 
stopped (or cancelled if they took too long to stop).
 // Zombie tasks are handled by a round of zombie fencing for exactly 
once source connectors. Zombie sink tasks are handled
-// naturally because requests to alter consumer group offsets will 
fail if there are still active members in the group.
+// naturally because requests to alter consumer group offsets / delete 
consumer groups will fail if there are still active members
+// in the group.
 if (configState.targetState(connName) != TargetState.STOPPED || 
configState.taskCount(connName) != 0) {
-callback.onCompletion(new BadRequestException("Connectors must be 
in the STOPPED state before their offsets can be altered. This " +
-"can be done for the specified connector by issuing a PUT 
request to the /connectors/" + connName + "/stop endpoint"), null);
+callback.onCompletion(new BadRequestException("Connectors must be 
in the STOPPED state before their offsets can be modified externally. " +

Review Comment:
   I think just "modified" should be clear enough to users since this is 
synchronously surfaced to the user via the alter / reset offsets REST API 
response 👍 



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-  

[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221909838


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 result
   }
 
+  /**
+   * Maybe create and return the verification guard object for the given 
producer ID if the transaction is not yet ongoing.
+   * Creation starts the verification process. Otherwise return null.
+   */
+  def maybeStartTransactionVerification(producerId: Long): Object = lock 
synchronized {
+if (hasOngoingTransaction(producerId))
+  null
+else
+  verificationGuard(producerId, true)
+  }
+
+  /**
+   * Maybe create the VerificationStateEntry for the given producer ID -- if 
an entry is present, return its verification guard, otherwise, return null.
+   */
+  def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): 
Object = lock synchronized {

Review Comment:
   This is also used in unifiedLog when we create the object and when we do the 
final check. But I can make it package private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mehbey commented on pull request #13709: KAFKA-14991:Added a validation to check if the record timestamp is in the future compared to the broker's timestamp

2023-06-07 Thread via GitHub


mehbey commented on PR #13709:
URL: https://github.com/apache/kafka/pull/13709#issuecomment-1581221839

   Hey @showuon , the 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-937%3A+Improve+Message+Timestamp+Validation#KIP937)
 for this pull request (PR) is published, and there have been a few 
discussions. I wanted to notify you and see if you have any comments, as you 
originally brought up the idea of writing the KIP. Please take a look whenever 
you have the time, in case you haven't already seen it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-07 Thread via GitHub


C0urante commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1221916704


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-if (!partitionsToReset.isEmpty()) {
-log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-connName, partitionsToReset);
-DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-deleteConsumerGroupOffsetsOptions);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
 
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+// This should only occur for an offset reset request when:
+// 1. There was a prior attempt to reset offsets
+// OR
+// 2. No offsets have been committed yet
+if (offsetsToWrite.isEmpty()) {

Review Comment:
   Good call--I agree that deleting the c

[GitHub] [kafka] C0urante commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-07 Thread via GitHub


C0urante commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1221803692


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -374,19 +374,38 @@ public synchronized void connectorOffsets(String 
connName, Callback, Map> offsets, Callback cb) {
+protected synchronized void modifyConnectorOffsets(String connName, 
Map, Map> offsets, Callback cb) {
+if (!modifyConnectorOffsetsChecks(connName, cb)) {
+return;
+}
+
+if (offsets == null) {
+worker.resetConnectorOffsets(connName, 
configState.connectorConfig(connName), cb);
+} else {
+worker.alterConnectorOffsets(connName, 
configState.connectorConfig(connName), offsets, cb);
+}
+}
+
+/**
+ * This method performs a few checks for external requests to modify 
(alter or reset) connector offsets and
+ * completes the callback exceptionally if any check fails.
+ * @param connName the name of the connector whose offsets are to be 
modified
+ * @param cb callback to invoke upon completion
+ * @return true if all the checks passed, false otherwise
+ */
+private boolean modifyConnectorOffsetsChecks(String connName, 
Callback cb) {
 if (!configState.contains(connName)) {
 cb.onCompletion(new NotFoundException("Connector " + connName + " 
not found", null), null);
-return;
+return false;
 }
 
 if (configState.targetState(connName) != TargetState.STOPPED || 
configState.taskCount(connName) != 0) {
-cb.onCompletion(new BadRequestException("Connectors must be in the 
STOPPED state before their offsets can be altered. " +
-"This can be done for the specified connector by issuing a 
PUT request to the /connectors/" + connName + "/stop endpoint"), null);
-return;
+cb.onCompletion(new BadRequestException("Connectors must be in the 
STOPPED state before their offsets can be modified externally. " +

Review Comment:
   Nit: "modified externally" may not be very clear to users. I think "altered" 
captures the general concept well enough even if they're doing a reset, but if 
you'd like to be more precise, maybe we can say "altered or reset"?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1587,10 +1595,11 @@ private boolean alterConnectorOffsetsChecks(String 
connName, Callback c
 // If the target state for the connector is stopped, its task count is 
0, and there is no rebalance pending (checked above),
 // we can be sure that the tasks have at least been attempted to be 
stopped (or cancelled if they took too long to stop).
 // Zombie tasks are handled by a round of zombie fencing for exactly 
once source connectors. Zombie sink tasks are handled
-// naturally because requests to alter consumer group offsets will 
fail if there are still active members in the group.
+// naturally because requests to alter consumer group offsets / delete 
consumer groups will fail if there are still active members
+// in the group.
 if (configState.targetState(connName) != TargetState.STOPPED || 
configState.taskCount(connName) != 0) {
-callback.onCompletion(new BadRequestException("Connectors must be 
in the STOPPED state before their offsets can be altered. This " +
-"can be done for the specified connector by issuing a PUT 
request to the /connectors/" + connName + "/stop endpoint"), null);
+callback.onCompletion(new BadRequestException("Connectors must be 
in the STOPPED state before their offsets can be modified externally. " +

Review Comment:
   Same thought RE "modified externally" (though seeing how "modified"/"modify" 
is used in some other log and exception messages in this class, I think just 
"modified" could work here too).



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+Map offsetsT

[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221911516


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2095,82 +2105,143 @@ class ReplicaManagerTest {
   }
 
   @Test
-  def testVerificationForTransactionalPartitions(): Unit = {
-val tp = new TopicPartition(topic, 0)
-val transactionalId = "txn1"
+  def testVerificationForTransactionalPartitionsOnly(): Unit = {
+val tp0 = new TopicPartition(topic, 0)
+val tp1 = new TopicPartition(topic, 1)
 val producerId = 24L
 val producerEpoch = 0.toShort
 val sequence = 0
-
-val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
-val metadataCache = mock(classOf[MetadataCache])
+val node = new Node(0, "host1", 0)
 val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
 
-val replicaManager = new ReplicaManager(
-  metrics = metrics,
-  config = config,
-  time = time,
-  scheduler = new MockScheduler(time),
-  logManager = mockLogMgr,
-  quotaManagers = quotaManager,
-  metadataCache = metadataCache,
-  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-  alterPartitionManager = alterPartitionManager,
-  addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
-
+val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0, tp1), node)
 try {
-  val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), 
tp, Seq(0, 1), LeaderAndIsr(1,  List(0, 1)))
-  replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => 
())
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
 
-  // We must set up the metadata cache to handle the append and 
verification.
-  val metadataResponseTopic = Seq(new MetadataResponseTopic()
-.setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
-.setPartitions(Seq(
-  new MetadataResponsePartition()
-.setPartitionIndex(0)
-.setLeaderId(0)).asJava))
-  val node = new Node(0, "host1", 0)
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
 
-  when(metadataCache.contains(tp)).thenReturn(true)
-  
when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), 
config.interBrokerListenerName)).thenReturn(metadataResponseTopic)
-  when(metadataCache.getAliveBrokerNode(0, 
config.interBrokerListenerName)).thenReturn(Some(node))
-  when(metadataCache.getAliveBrokerNode(1, 
config.interBrokerListenerName)).thenReturn(None)
-  
-  // We will attempt to schedule to the request handler thread using a non 
request handler thread. Set this to avoid error.
-  KafkaRequestHandler.setBypassThreadCheck(true)
+  // If we supply no transactional ID and idempotent records, we do not 
verify.
+  val idempotentRecords = 
MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+  appendRecords(replicaManager, tp0, idempotentRecords)
+  verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), 
any[AddPartitionsToTxnManager.AppendCallback]())
+  assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+
+  // If we supply a transactional ID and some transactional and some 
idempotent records, we should only verify the topic partition with 
transactional records.
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence + 1,
+new SimpleRecord("message".getBytes))
+
+  val transactionToAdd = new AddPartitionsToTxnTransaction()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setVerifyOnly(true)
+.setTopics(new AddPartitionsToTxnTopicCollection(
+  Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+))
+
+  val idempotentRecords2 = 
MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+  appendRecordsToMultipleTopics(replicaManager, Map(tp0 -> 
transactionalRecords, tp1 -> idempotentRecords2), transactionalId, Some(0))
+  verify(addPartitionsToTxnManager, 
times(1)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), 
any[AddPartitionsToTxnManager.AppendCallback]())
+  assertNotEquals(null, getVerificationGuard(replicaManager, tp0, 
producerId))
+  assertEquals(null, get

[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221910510


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -980,6 +1006,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+
+  // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.
+  // This guarantees that transactional records are never written to 
the log outside of the transaction coordinator's knowledge of an open 
transaction on
+  // the partition. If we do not have an ongoing transaction or 
correct guard, return an error and do not append.
+  // There are two phases -- the first append to the log and 
subsequent appends.
+  //
+  // 1. First append: Verification starts with creating a verification 
guard object, sending a verification request to the transaction coordinator, and
+  // given a "verified" response, continuing the append path. (A 
non-verified response throws an error.) We create the unique verification guard 
for the transaction
+  // to ensure there is no race between the transaction coordinator 
response and an abort marker getting written to the log. We need a unique guard 
because we could
+  // have a sequence of events where we start a transaction 
verification, have the transaction coordinator send a verified response, write 
an abort marker,
+  // start a new transaction not aware of the partition, and receive 
the stale verification (ABA problem). With a unique verification guard object, 
this sequence would not
+  // result in appending to the log and would return an error. The 
guard is removed after the first append to the transaction and from then, we 
can rely on phase 2.
+  //
+  // 2. Subsequent appends: Once we write to the transaction, the 
in-memory state currentTxnFirstOffset is populated. This field remains until the
+  // transaction is completed or aborted. We can guarantee the 
transaction coordinator knows about the transaction given step 1 and that the 
transaction is still
+  // ongoing. If the transaction is expected to be ongoing, we will 
not set a verification guard. If the transaction is aborted, 
hasOngoingTransaction is false and
+  // requestVerificationGuard is null, so we will throw an error. A 
subsequent produce request (retry) should create verification state and return 
to phase 1.
+  if (batch.isTransactional && 
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
+if (!hasOngoingTransaction(batch.producerId) && 
(requestVerificationGuard != verificationGuard(batch.producerId) || 
requestVerificationGuard == null))

Review Comment:
   This is a hold over from the previous pr that also included tentative state. 
Since we may move that, I can make this all one. (But I may make it a helper 
since this is quite a lot of logic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-07 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221909838


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 result
   }
 
+  /**
+   * Maybe create and return the verification guard object for the given 
producer ID if the transaction is not yet ongoing.
+   * Creation starts the verification process. Otherwise return null.
+   */
+  def maybeStartTransactionVerification(producerId: Long): Object = lock 
synchronized {
+if (hasOngoingTransaction(producerId))
+  null
+else
+  verificationGuard(producerId, true)
+  }
+
+  /**
+   * Maybe create the VerificationStateEntry for the given producer ID -- if 
an entry is present, return its verification guard, otherwise, return null.
+   */
+  def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): 
Object = lock synchronized {

Review Comment:
   This is also used in unifiedLog when we do the final check. But I can make 
it package private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #13530: KAFKA-14858: Handle exceptions thrown from Connector::taskConfigs in Connect's standalone mode

2023-06-07 Thread via GitHub


yashmayya commented on PR #13530:
URL: https://github.com/apache/kafka/pull/13530#issuecomment-1581192139

   > Modifying the status parts of the REST API seems less like introducing 
further divergence and more like honestly reporting that existing divergence to 
users.
   
   Fair enough, I've pushed a change (which is oddly not showing up on the 
GitHub PR UI, although it does show up on my fork's branch which this PR is 
created from) where the connector's status is updated to failed and added some 
tests. I'm wondering whether we should also add some verbiage like `If there 
were any tasks running previously, they will continue running` to make it clear 
to users that it's only the connector instance that has failed as a result of 
not being able to reconfigure its tasks on startup (either first time or post 
update) / restart / resume.
   
   > Regardless, you're correct that we can't really alter the infinite-retry 
logic in distributed mode without a KIP. I do think a long-term fix for this 
problem would involve aligning how the two modes handle this kind of failure. 
Would love to see that if you have the time 😄
   
   I'll add it to my TODO list if you think it's worth fixing 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan merged pull request #13811: KAFKA-14278: Fix InvalidProducerEpochException and InvalidTxnStateException handling in producer clients

2023-06-07 Thread via GitHub


jolshan merged PR #13811:
URL: https://github.com/apache/kafka/pull/13811


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13823: MINOR: Move MockTime to server-common

2023-06-07 Thread via GitHub


dajac commented on PR #13823:
URL: https://github.com/apache/kafka/pull/13823#issuecomment-1581188466

   @mimaison @showuon @dengziming Would one of you have time to review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15056) Kafka producer still fails with ClusterAuthorizationException after permission granted

2023-06-07 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730199#comment-17730199
 ] 

Kirk True commented on KAFKA-15056:
---

[~tophei] is this fixed by 
[KAFKA-13668|https://issues.apache.org/jira/browse/KAFKA-13668]?

cc [~pnee]

> Kafka producer still fails with ClusterAuthorizationException after 
> permission granted
> --
>
> Key: KAFKA-15056
> URL: https://issues.apache.org/jira/browse/KAFKA-15056
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeff
>Priority: Major
>
> Hi team, we are using kafka client 3.1.2 in the application, when initiating 
> a KafkaProducer without explicitly configuring idempotent write, it failed 
> with with ClusterAuthorizationException which is expected since idempotent 
> write is enabled by default. But after we granting the producer principal 
> IDEMPOTENT_WRITE permission, the producer still fails with same error until 
> we restarted the application and re-initiated the producer.
> After checking the log and stacktrace, the code fails at this line  
> [https://github.com/apache/kafka/blame/3.1.2/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1000]
> and in turn throws exception at this line 
> [https://github.com/apache/kafka/blob/3.1.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1125]
> It appears the acl check is not happening at runtime dynamically, considering 
> the 'currentState' was still not set to a correct value after permission 
> granted. Besides, do we omit the checking of 
> 'transactionManager.isTransaction()' at this line on purpose in 3.1.2? 
> [https://github.com/apache/kafka/blob/3.1.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#LL988C70-L988C70]
>   This checking seemed to make sense since only transactional producer need 
> further call 'transactionManager.maybeAddPartitionToTransaction(tp);'?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT

2023-06-07 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730195#comment-17730195
 ] 

Kirk True commented on KAFKA-14445:
---

[~ocadaruma] my apologies, I didn't read your initial comments carefully enough.

You're correct, my patch does nothing to handle this case, so a patch would be 
welcomed :)

> Producer doesn't request metadata update on REQUEST_TIMED_OUT
> -
>
> Key: KAFKA-14445
> URL: https://issues.apache.org/jira/browse/KAFKA-14445
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>
> Produce requests may fail with timeout by `request.timeout.ms` in below two 
> cases:
>  * Didn't receive produce response within `request.timeout.ms`
>  * Produce response received, but it ended up with `REQUEST_TIMED_OUT` in the 
> broker
> Former case usually happens when a broker-machine got failed or there's 
> network glitch etc.
> In this case, the connection will be disconnected and metadata-update will be 
> requested to discover new leader: 
> [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556]
>  
> The problem is in latter case (REQUEST_TIMED_OUT on the broker).
> In this case, the produce request will be ended up with TimeoutException, 
> which doesn't inherit InvalidMetadataException so it doesn't trigger metadata 
> update.
>  
> Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side 
> problem, that metadata-update doesn't make much sense indeed.
>  
> However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT 
> could cause produce requests to retry unnecessarily , which may end up with 
> batch expiration due to delivery timeout.
> Below is the scenario we experienced:
>  * Environment:
>  ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1
>  ** min.insync.replicas=2
>  ** acks=all
>  * Scenario:
>  ** broker 1 "partially" failed
>  *** It lost ZooKeeper connection and kicked out from the cluster
>   There was controller log like:
>  * 
> {code:java}
> [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , 
> deleted brokers: 1, bounced brokers: {code}
>  * 
>  ** 
>  *** However, somehow the broker was able continued to receive produce 
> requests
>   We're still working on investigating how this is possible though.
>   Indeed, broker 1 was somewhat "alive" and keeps working according to 
> server.log
>  *** In other words, broker 1 became "zombie"
>  ** broker 2 was elected as new leader
>  *** broker 3 became follower of broker 2
>  *** However, since broker 1 was still out of cluster, it didn't receive 
> LeaderAndIsr so 1 kept thinking itself as the leader of tp-0
>  ** Meanwhile, producer keeps sending produce requests to broker 1 and 
> requests were failed due to REQUEST_TIMED_OUT because no brokers replicates 
> from broker 1.
>  *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't 
> have a change to update its stale metadata
>  
> So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, 
> to address the case that the old leader became "zombie"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13531) Flaky test NamedTopologyIntegrationTest

2023-06-07 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13531:
-
Attachment: 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout

> Flaky test NamedTopologyIntegrationTest
> ---
>
> Key: KAFKA-13531
> URL: https://issues.apache.org/jira/browse/KAFKA-13531
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthew de Detrich
>Priority: Critical
>  Labels: flaky-test
> Attachments: 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout
>
>
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets
> {quote}java.lang.AssertionError: Did not receive all 3 records from topic 
> output-stream-2 within 6 ms, currently accumulated data is [] Expected: 
> is a value equal to or greater than <3> but: <0> was less than <3> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:648)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:644)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:617)
>  at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:439){quote}
> STDERR
> {quote}java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting 
> offsets of a topic is forbidden while the consumer group is actively 
> subscribed to it. at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>  at 
> org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> at 
> org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122)
>  at 
> org.apache.kafka.streams.processor.internals.TopologyMetadata.maybeNotifyTopologyVersionWaiters(TopologyMetadata.java:154)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkForTopologyUpdates(StreamThread.java:916)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575)
>  Caused by: org.apache.kafka.common.errors.GroupSubscribedToTopicException: 
> Deleting offsets of a topic is forbidden while the consumer group is actively 
> subscribed to it. java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting 
> offsets of a topic is forbidden while the consumer group is actively 
> subscribed to it. at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>  at 
> org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.

[jira] [Commented] (KAFKA-13531) Flaky test NamedTopologyIntegrationTest

2023-06-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730190#comment-17730190
 ] 

John Roesler commented on KAFKA-13531:
--

Seen again while verifying the 3.5.0 release artifacts (this was the only test 
failure):

Gradle Test Run :streams:test > Gradle Test Executor 554 > 
NamedTopologyIntegrationTest > 
shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() FAILED
    java.lang.AssertionError: Did not receive all 3 records from topic 
output-stream-1 within 6 ms,  currently accumulated data is []
    Expected: is a value equal to or greater than <3>
         but: <0> was less than <3>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:730)
        at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
        at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:353)
        at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:726)
        at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:699)
        at 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing(NamedTopologyIntegrationTest.java:563)

 

logs: 
[^org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout]

> Flaky test NamedTopologyIntegrationTest
> ---
>
> Key: KAFKA-13531
> URL: https://issues.apache.org/jira/browse/KAFKA-13531
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthew de Detrich
>Priority: Critical
>  Labels: flaky-test
> Attachments: 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout
>
>
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets
> {quote}java.lang.AssertionError: Did not receive all 3 records from topic 
> output-stream-2 within 6 ms, currently accumulated data is [] Expected: 
> is a value equal to or greater than <3> but: <0> was less than <3> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:648)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:644)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:617)
>  at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:439){quote}
> STDERR
> {quote}java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting 
> offsets of a topic is forbidden while the consumer group is actively 
> subscribed to it. at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>  at 
> org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> at 
> org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122)
>  at 
> org.apache.kafka.streams.processor.internals.

[jira] [Created] (KAFKA-15069) Refactor scanning hierarchy out of DelegatingClassLoader

2023-06-07 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15069:
---

 Summary: Refactor scanning hierarchy out of DelegatingClassLoader
 Key: KAFKA-15069
 URL: https://issues.apache.org/jira/browse/KAFKA-15069
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Greg Harris
Assignee: Greg Harris


The DelegatingClassLoader is involved in both scanning and using the results of 
scanning to process classloading.
Instead, the scanning should take place outside of the DelegatingClassLoader, 
and results of scanning be passed back into the DelegatingClassLoader for 
classloading functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1

2023-06-07 Thread via GitHub


clolov commented on PR #13260:
URL: https://github.com/apache/kafka/pull/13260#issuecomment-1581120917

   I have not, but I will aim to carry out the check in the upcoming days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2023-06-07 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-10337:
---

Assignee: Erik van Oosten  (was: David Jacot)

> Wait for pending async commits in commitSync() even if no offsets are 
> specified
> ---
>
> Key: KAFKA-10337
> URL: https://issues.apache.org/jira/browse/KAFKA-10337
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tom Lee
>Assignee: Erik van Oosten
>Priority: Major
> Fix For: 3.6.0
>
>
> The JavaDoc for commitSync() states the following:
> {quote}Note that asynchronous offset commits sent previously with the
> {@link #commitAsync(OffsetCommitCallback)}
>  (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {quote}
> But should we happen to call the method with an empty offset map
> (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
> async commits will not be invoked because of an early return in 
> ConsumerCoordinator.commitOffsetsSync() when the input map is empty.
> If users are doing manual offset commits and relying on commitSync as a 
> barrier for in-flight async commits prior to a rebalance, this could be an 
> important (though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15051) docs: add missing connector plugin endpoint to documentation

2023-06-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730156#comment-17730156
 ] 

ASF GitHub Bot commented on KAFKA-15051:


C0urante opened a new pull request, #520:
URL: https://github.com/apache/kafka-site/pull/520

   Ports the changes from https://github.com/apache/kafka/pull/13803 back 
through 3.2, the the version that originally added this endpoint.




> docs: add missing connector plugin endpoint to documentation
> 
>
> Key: KAFKA-15051
> URL: https://issues.apache.org/jira/browse/KAFKA-15051
> Project: Kafka
>  Issue Type: Task
>  Components: docs, documentation
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Minor
>
> GET /plugin/config endpoint added in 
> [KIP-769|https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions]
>  is not included in the connect documentation page: 
> https://kafka.apache.org/documentation/#connect_rest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2023-06-07 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-10337:
---

Assignee: David Jacot

> Wait for pending async commits in commitSync() even if no offsets are 
> specified
> ---
>
> Key: KAFKA-10337
> URL: https://issues.apache.org/jira/browse/KAFKA-10337
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tom Lee
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.6.0
>
>
> The JavaDoc for commitSync() states the following:
> {quote}Note that asynchronous offset commits sent previously with the
> {@link #commitAsync(OffsetCommitCallback)}
>  (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {quote}
> But should we happen to call the method with an empty offset map
> (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
> async commits will not be invoked because of an early return in 
> ConsumerCoordinator.commitOffsetsSync() when the input map is empty.
> If users are doing manual offset commits and relying on commitSync as a 
> barrier for in-flight async commits prior to a rebalance, this could be an 
> important (though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils

2023-06-07 Thread via GitHub


vamossagar12 commented on PR #13158:
URL: https://github.com/apache/kafka/pull/13158#issuecomment-1581026296

   I see. TBH I haven't followed that migration off-late. I think there are 
build failures as well which I haven't addressed yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-06-07 Thread via GitHub


erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1581020841

   > @erikvanoosten It seems that you don't have an account for jira so I can't 
assign the ticket to you. You should request one.
   
   I do have an account. My Jira userid is erikvanoosten.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint

2023-06-07 Thread via GitHub


C0urante commented on PR #13803:
URL: https://github.com/apache/kafka/pull/13803#issuecomment-1581020451

   @vvcephei haha, no worries! I'll leave Jenkins alone if I see you in the 
neighborhood testing again 🙏


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint

2023-06-07 Thread via GitHub


C0urante merged PR #13803:
URL: https://github.com/apache/kafka/pull/13803


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks

2023-06-07 Thread via GitHub


C0urante commented on PR #13819:
URL: https://github.com/apache/kafka/pull/13819#issuecomment-1581002727

   Ah, I hadn't noticed that this was only introduced in 
https://github.com/apache/kafka/pull/13465. In that case, the good news is that 
this bug hasn't made it into a release yet! And the bad news is that it's a 
regression and we have to merge a fix before putting out 3.6.0 😅 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2023-06-07 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-10337.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

> Wait for pending async commits in commitSync() even if no offsets are 
> specified
> ---
>
> Key: KAFKA-10337
> URL: https://issues.apache.org/jira/browse/KAFKA-10337
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tom Lee
>Priority: Major
> Fix For: 3.6.0
>
>
> The JavaDoc for commitSync() states the following:
> {quote}Note that asynchronous offset commits sent previously with the
> {@link #commitAsync(OffsetCommitCallback)}
>  (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {quote}
> But should we happen to call the method with an empty offset map
> (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
> async commits will not be invoked because of an early return in 
> ConsumerCoordinator.commitOffsetsSync() when the input map is empty.
> If users are doing manual offset commits and relying on commitSync as a 
> barrier for in-flight async commits prior to a rebalance, this could be an 
> important (though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-06-07 Thread via GitHub


dajac commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1580998335

   @erikvanoosten It seems that you don't have an account for jira so I can't 
assign the ticket to you. You should request one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2023-06-07 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-10337:
---

Assignee: (was: Kirk True)

> Wait for pending async commits in commitSync() even if no offsets are 
> specified
> ---
>
> Key: KAFKA-10337
> URL: https://issues.apache.org/jira/browse/KAFKA-10337
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tom Lee
>Priority: Major
>
> The JavaDoc for commitSync() states the following:
> {quote}Note that asynchronous offset commits sent previously with the
> {@link #commitAsync(OffsetCommitCallback)}
>  (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {quote}
> But should we happen to call the method with an empty offset map
> (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
> async commits will not be invoked because of an early return in 
> ConsumerCoordinator.commitOffsetsSync() when the input map is empty.
> If users are doing manual offset commits and relying on commitSync as a 
> barrier for in-flight async commits prior to a rebalance, this could be an 
> important (though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac merged pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-06-07 Thread via GitHub


dajac merged PR #13678:
URL: https://github.com/apache/kafka/pull/13678


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12306: KAFKA-13976: Improvements for OpenAPI specs

2023-06-07 Thread via GitHub


vamossagar12 commented on PR #12306:
URL: https://github.com/apache/kafka/pull/12306#issuecomment-1580988777

   Converting to draft. Need some time before I get to this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 closed pull request #12485: KAFKA-14131: Adding InterruptException when reading to end of Offseto…

2023-06-07 Thread via GitHub


vamossagar12 closed pull request #12485: KAFKA-14131: Adding InterruptException 
when reading to end of Offseto…
URL: https://github.com/apache/kafka/pull/12485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14131) KafkaBasedLog#readToLogEnd() may accciedently falls into infinite loop

2023-06-07 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14131:
-

Assignee: Sambhav Jain  (was: Sagar Rao)

> KafkaBasedLog#readToLogEnd() may accciedently falls into infinite loop
> --
>
> Key: KAFKA-14131
> URL: https://issues.apache.org/jira/browse/KAFKA-14131
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Justinwins
>Assignee: Sambhav Jain
>Priority: Major
>
> When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by
> DistributedHerder.herderExecutor of name "Distrubuted-connect-"  thread , e.g 
> . Distrubuted-connect-28-1 , which may consume  a few minutes.
> If another thread tries to shut down this herder , it will block for 
> "task.shutdown.graceful.timeout.ms ' before  the 
> DistributedHerder.herderExecutor is interrupted. 
> And if thread in DistributedHerder.herderExecutor is interupted, 
> KafkaOffsetBackingStore.readToLogEnd() will  poll(Integer.MAX_VALUE) and log "
> Error polling" as  the the read  has been interrupted, then 
> "consumer.position" will not advance, readToLogEnd() falls into infinite loop.
>  
> {code:java}
> // code placeholder
> private void readToLogEnd() {
> Set assignment = consumer.assignment();
> Map endOffsets = readEndOffsets(assignment);
> log.trace("Reading to end of log offsets {}", endOffsets);
> while (!endOffsets.isEmpty()) { // this loop will never jump out
> Iterator> it = 
> endOffsets.entrySet().iterator();
> while (it.hasNext()) {
> Map.Entry entry = it.next();
> TopicPartition topicPartition = entry.getKey();
> long endOffset = entry.getValue();
> long lastConsumedOffset = consumer.position(topicPartition);  // 
> when thread was in interupted status ,consumer.position will not advance
> if (lastConsumedOffset >= endOffset) {
> log.trace("Read to end offset {} for {}", endOffset, 
> topicPartition);
> it.remove();
> } else {
> log.trace("Behind end offset {} for {}; last-read offset is 
> {}",
> endOffset, topicPartition, lastConsumedOffset);
> poll(Integer.MAX_VALUE); // here , poll() will catch 
> InterruptedException and log it without throwing it up
> break;
> }
> }
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13821: MINOR: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner

2023-06-07 Thread via GitHub


C0urante commented on PR #13821:
URL: https://github.com/apache/kafka/pull/13821#issuecomment-1580969721

   @gharris1727 can we start filing Jira tickets for these changes to 
classloading logic? They're large enough that I don't think `MINOR` is 
warranted anymore, and it'd be nice to have a picture of how they fit into the 
high-level implementation plan for KIP-898.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >