[jira] [Commented] (KAFKA-14322) Kafka node eating Disk continuously

2023-05-03 Thread Sergey Ivanov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719157#comment-17719157
 ] 

Sergey Ivanov commented on KAFKA-14322:
---

Hi,

We faced similar problem.

I described it in ticket KAFKA-14817, these may be related issues.

> Kafka node eating Disk continuously 
> 
>
> Key: KAFKA-14322
> URL: https://issues.apache.org/jira/browse/KAFKA-14322
> Project: Kafka
>  Issue Type: Bug
>  Components: log, log cleaner
>Affects Versions: 2.8.1
>Reporter: Abhijit Patil
>Priority: Major
> Attachments: image-2022-10-19-15-51-52-735.png, 
> image-2022-10-19-15-53-39-928.png
>
>
> We have 2.8.1 Kafka cluster in our Production environment. It has it 
> continuously growing disk consumption and eating all disk space allocated and 
> crash node with no disk space left
> !image-2022-10-19-15-51-52-735.png|width=344,height=194!
>  
> !image-2022-10-19-15-53-39-928.png|width=470,height=146!
> [Log partition=__consumer_offsets-41, dir=/var/lib/kafka/data/kafka-log0] 
> Rolled new log segment at offset 10537467423 in 4 ms. (kafka.log.Log) 
> [data-plane-kafka-request-handler-4]"
> I can see that for node 0 for partition __consumer_offsets-41 its rolling new 
> segment however its never got cleanup.
> This is the root cause for disk uses increase.  
> Due to some condition/bug/trigger, something internally has gone wrong with 
> the consumer offset coordinator thread and it has gone berserk!  
>  
> Take a look at the consumer-offset logs below it's generating. If you take a 
> closer look it's the same data it's writing in a loop forever. The product 
> topic in question doesn't have any traffic. This is generating an insane 
> amount of consumer-offset logs which currently amounts to *571GB* and this is 
> endless no matter how much terabytes we add it will eat it eventually{*}.{*}  
>  One more thing the consumer offset logs it's generating also marking 
> everything as invalid that you can in the second log dump below.
>  
> {-}kafka-0 data]$ du -sh kafka-log0/__consumer_offsets{-}*
> 12K kafka-log0/__consumer_offsets-11
> 12K kafka-log0/__consumer_offsets-14
> 12K kafka-log0/__consumer_offsets-17
> 12K kafka-log0/__consumer_offsets-2
> 12K kafka-log0/__consumer_offsets-20
> 12K kafka-log0/__consumer_offsets-23
> 12K kafka-log0/__consumer_offsets-26
> 12K kafka-log0/__consumer_offsets-29
> 12K kafka-log0/__consumer_offsets-32
> 12K kafka-log0/__consumer_offsets-35
> 12K kafka-log0/__consumer_offsets-38
> *588G* kafka-log0/__consumer_offsets-41
> 48K kafka-log0/__consumer_offsets-44
> 12K kafka-log0/__consumer_offsets-47
> 12K kafka-log0/__consumer_offsets-5
> 12K kafka-log0/__consumer_offsets-8
>  
> [response-consumer,feature.response.topic,2]::OffsetAndMetadata(offset=107, 
> leaderEpoch=Optional[23], metadata=, commitTimestamp=1664883985122, 
> expireTimestamp=None) 
> *[response-consumer,feature.response.topic,15]::OffsetAndMetadata(offset=112, 
> leaderEpoch=Optional[25], metadata=, commitTimestamp=1664883985129, 
> expireTimestamp=None)*
>  
>  
> {*}[response-consumer,feature.response.topic,15]::OffsetAndMetadata(offset=112,
>  leaderEpoch=Optional[25], metadata=, commitTimestamp=1664883985139, 
> expireTimestamp=None) 
> [response-\{*}consumer,.feature.response.topic,13]::OffsetAndMetadata(offset=112,
>  leaderEpoch=Optional[24], metadata=, commitTimestamp=1664883985139, 
> expireTimestamp=None)
>  
>  
> baseOffset: 5616487061 lastOffset: 5616487061 count: 1 baseSequence: 0 
> lastSequence: 0 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 6 
> isTransactional: false
> isControl: false position: 3423 CreateTime: 1660892213452 size: 175 magic: 2 
> compresscodec: NONE crc: 1402370404 *isvalid: true*
> baseOffset: 5616487062 lastOffset: 5616487062 count: 1 baseSequence: 0 
> lastSequence: 0 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 6 
> isTransactional: false
> isControl: false position: 3598 CreateTime: 1660892213462 size: 175 magic: 2 
> compresscodec: NONE crc: 1105941790 *isvalid: true*
> |offset: 5616487062 CreateTime: 1660892213462 keysize: 81 valuesize: 24 
> sequence: 0 headerKeys: [] key:|
> For our topics we have below retention configuration 
> retention.ms: 8640
> segment.bytes: 1073741824
>  
> For consumer offset internal topic its default cleanup policy and retention.
>  
> We suspect this is similar to 
> https://issues.apache.org/jira/browse/KAFKA-9543 
> This appear for 1 environment only, same cluster with same configuration 
> works corrctly on other environments. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14817) LogCleaner mark some partitions of __consumer_offsets as uncleanable

2023-05-03 Thread Sergey Ivanov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719156#comment-17719156
 ] 

Sergey Ivanov commented on KAFKA-14817:
---

Hi,

We faced the same issue in Kafka 2.8.1. Some partitions for __consumer_offsets 
topic had exta-space size:
{code:java}
9.7G    /var/opt/kafka/data/1/__consumer_offsets-29
9.5G    /var/opt/kafka/data/1/__consumer_offsets-40
6.1G    /var/opt/kafka/data/1/__consumer_offsets-1
132M    /var/opt/kafka/data/1/__consumer_offsets-37
129M    /var/opt/kafka/data/1/__consumer_offsets-24
128M    /var/opt/kafka/data/1/__consumer_offsets-5
...
9.0M/var/opt/kafka/data/1/__consumer_offsets-6
7.9M/var/opt/kafka/data/1/__consumer_offsets-2
620K/var/opt/kafka/data/1/__consumer_offsets-35
{code}
And when we check logs inside we found very old segments: 
{code:java}
bash-5.1$ ls -lah /var/opt/kafka/data/1/__consumer_offsets-29
total 9.7G
drwxrwsr-x    2 kafka kafka  24K May  3 04:29 .
drwxrwsr-x 4583 kafka kafka 432K May  3 13:55 ..
-rw-rw-r--    1 kafka kafka    8 Feb 26 06:52 .index
-rw-rw-r--    1 kafka kafka  19K Feb 26 06:52 .log
-rw-rw-r--    1 kafka kafka   12 Feb 26 06:52 .timeindex
-rw-rw-r--    1 kafka kafka    0 Feb 27 13:18 32953821.index
-rw-rw-r--    1 kafka kafka 7.8K Feb 27 13:18 32953821.log
...
-rw-r--r--    1 kafka kafka  34M May  3 13:55 69099848.log
-rw-r--r--    1 kafka kafka   10 May  3 02:37 69099848.snapshot
-rw-r--r--    1 kafka kafka  10M May  3 13:55 69099848.timeindex
-rw-r--r--    1 kafka kafka  803 May  3 04:29 leader-epoch-checkpoint
-rw-rw-r--    1 kafka kafka   43 Nov 18 12:12 partition.metadata
{code}
But this topic has standard retention is 7 days.

And we also found log cleaner issue with this partition in logs:
{code:java}
[2023-04-22 12:10:06,410] WARN [kafka-log-cleaner-thread-0]: Unexpected 
exception thrown when cleaning log 
Log(dir=/var/opt/kafka/data/1/__consumer_offsets-29, topic=__consumer_offsets, 
partition=29, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
logEndOffset=64827489). Marking its partition (__consumer_offsets-29) as 
uncleanable (kafka.log.LogCleaner)
kafka.log.LogCleaningException: Batch size 176 < buffer size 1048588, but not 
processed for log segment 
/var/opt/kafka/data/1/__consumer_offsets-29/33563994.log at 
position 37515124
Caused by: java.lang.IllegalStateException: Batch size 176 < buffer size 
1048588, but not processed for log segment 
/var/opt/kafka/data/1/__consumer_offsets-29/33563994.log at 
position 37515124
{code}
As *WA* we changed "cleanup.policy" to "delete" and Kafka removed all old 
segments (including uncleanable partitions) and left only one week logs, after 
this we returned policy to "compact".
This case is also considered in article 
[https://luppeng.wordpress.com/2022/08/21/possible-reasons-why-a-kafka-topic-is-not-being-compacted/]
 (point 3).

But how to solve this issue in {*}permanent{*}?

> LogCleaner mark some partitions of __consumer_offsets as uncleanable
> 
>
> Key: KAFKA-14817
> URL: https://issues.apache.org/jira/browse/KAFKA-14817
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.1
>Reporter: ZhenChun Pan
>Priority: Major
>
> We find some patitions of topic __consumer_offsets can't retention it's log 
> any more and takes up a lot of disk space. Then we found these patitions of 
> topic __consumer_offsets have to mark as uncleanable in log-cleaner.log. The 
> logs below:
> [2023-03-17 17:53:46,655] INFO Starting the log cleaner (kafka.log.LogCleaner)
> [2023-03-17 17:53:46,770] INFO [kafka-log-cleaner-thread-0]: Starting 
> (kafka.log.LogCleaner)
> [2023-03-17 17:53:46,841] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-24. (kafka.log.LogCleaner)
> [2023-03-17 17:53:46,841] INFO Cleaner 0: Building offset map for 
> __consumer_offsets-24... (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,013] INFO Cleaner 0: Building offset map for log 
> __consumer_offsets-24 for 5 segments in offset range [0, 2360519). 
> (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,394] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 262144 bytes to 524288 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,395] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 524288 bytes to 1048576 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,396] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 1048576 bytes to 2097152 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,401] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 2097152 bytes to 4194304 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,409] INFO Cleaner 0: Growing cleaner I/O 

[GitHub] [kafka] mjsax commented on pull request #13654: HOTFIX: fix broken Streams upgrade system test

2023-05-03 Thread via GitHub


mjsax commented on PR #13654:
URL: https://github.com/apache/kafka/pull/13654#issuecomment-1534030108

   13 tests failed -- `StreamsBrokerBounceTest` should be unrelated, but also 
`StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces`
   
   Re-triggred to see if the same tests fails, or if it's flakyness: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5657/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-05-03 Thread via GitHub


jolshan commented on PR #13607:
URL: https://github.com/apache/kafka/pull/13607#issuecomment-1533922748

   haven't updated the base in a while, so merging so that we can hopefully get 
a cleaner build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan merged pull request #13655: MINOR: Reduce number of threads created for integration test brokers

2023-05-03 Thread via GitHub


jolshan merged PR #13655:
URL: https://github.com/apache/kafka/pull/13655


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13540: MINOR: improve QuorumController logging

2023-05-03 Thread via GitHub


mumrah commented on code in PR #13540:
URL: https://github.com/apache/kafka/pull/13540#discussion_r1184415101


##
metadata/src/main/java/org/apache/kafka/metadata/util/RecordRedactor.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.util;
+
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.KafkaConfigSchema;
+
+
+/**
+ * Converts a metadata record to a string suitable for logging to slf4j.
+ * This means that passwords and key material are omitted from the output.
+ */
+public final class RecordRedactor {

Review Comment:
   Nice. We can make use of this in KRaftMigrationDriver when we log the 
migrated records (in a separate PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1184411246


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1090,6 +1090,9 @@ private void maybeFailWithError() {
 } else if (lastError instanceof InvalidProducerEpochException) {
 throw new InvalidProducerEpochException("Producer with 
transactionalId '" + transactionalId
 + "' and " + producerIdAndEpoch + " attempted to produce 
with an old epoch");
+} else if (lastError instanceof IllegalStateException) {
+throw new IllegalStateException("Producer with transactionalId 
'" + transactionalId

Review Comment:
   Sorry to ask for another update, but should we add a more specific 
explanation? Maybe mentioning state transitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1184407115


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3405,6 +3408,53 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
 }
 
+@Test
+public void testMakeInvalidBackgroundTransitionFatal() {
+doInitTransactions();
+assertTrue(transactionManager.isTransactional());
+
+assertThrows(IllegalStateException.class, () -> 
transactionManager.handleFailedBatch(batchWithValue(tp0, "test"), new 
NetworkException(), false, BACKGROUND));
+assertTrue(transactionManager.hasFatalError());
+
+// Step 4: validate that the transactions can't be started, committed
+assertThrows(KafkaException.class, () -> 
transactionManager.beginTransaction());
+assertThrows(KafkaException.class, () -> 
transactionManager.beginAbort(FOREGROUND));
+assertThrows(KafkaException.class, () -> 
transactionManager.beginCommit());
+assertThrows(KafkaException.class, () -> 
transactionManager.maybeAddPartition(tp0));
+assertThrows(KafkaException.class, () -> 
transactionManager.initializeTransactions());
+assertThrows(KafkaException.class, () -> 
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("fake-group-id")));
+}
+
+@Test
+public void testForegroundInvalidStateTransitionIsRecoverable() {

Review Comment:
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1184406528


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3405,6 +3406,54 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
 }
 
+@Test
+public void testMakeIllegalTransitionFatal() {
+doInitTransactions();
+assertTrue(transactionManager.isTransactional());
+
+// Step 1: create a transaction.
+transactionManager.beginTransaction();
+assertTrue(transactionManager.hasOngoingTransaction());
+
+// Step 2: abort a transaction (wait for it to complete) and then 
verify that the transaction manager is
+// left in the READY state.
+TransactionalRequestResult abortResult = 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND);
+runUntil(abortResult::isCompleted);
+abortResult.await();
+assertTrue(abortResult.isSuccessful());
+assertFalse(transactionManager.hasOngoingTransaction());
+assertTrue(transactionManager.isReady());
+
+// Step 3: create a batch and simulate the Sender handling a failed 
batch, which would *attempt* to put
+// the transaction manager in the ABORTABLE_ERROR state. However, that 
is an illegal state transition, so
+// verify that it failed and caused the transaction manager to be put 
in an unrecoverable FATAL_ERROR state.
+ProducerBatch batch = batchWithValue(tp0, "test");
+assertThrowsFatalStateException("handleFailedBatch", () -> 
transactionManager.handleFailedBatch(batch, new NetworkException("Disconnected 
from node 4"), false));
+assertTrue(transactionManager.hasFatalError());
+
+// Step 4: validate that the transactions can't be started, committed
+assertThrowsFatalStateException("beginTransaction", () -> 
transactionManager.beginTransaction());
+assertThrowsFatalStateException("beginAbort", () -> 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND));
+assertThrowsFatalStateException("beginCommit", () -> 
transactionManager.beginCommit());
+assertThrowsFatalStateException("maybeAddPartition", () -> 
transactionManager.maybeAddPartition(tp0));
+assertThrowsFatalStateException("initializeTransactions", () -> 
transactionManager.initializeTransactions());
+assertThrowsFatalStateException("sendOffsetsToTransaction", () -> 
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("fake-group-id")));
+}
+
+private void assertThrowsFatalStateException(String methodName, Runnable 
operation) {
+try {
+operation.run();
+} catch (KafkaException t) {

Review Comment:
   That may be useful for debugging :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184405678


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+@Test
+public void testTransitionFromNewTargetToRevoke() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 3, 4, 5),
+mkTopicAssignment(topicId2, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.REVOKING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(10, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 3),
+mkTopicAssignment(topicId2, 6)
+), updatedMember.assignedPartitions());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2),
+mkTopicAssignment(topicId2, 4, 5)
+), updatedMember.partitionsPendingRevocation());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 4, 5),
+mkTopicAssignment(topicId2, 7, 8)
+), updatedMember.partitionsPendingAssignment());
+}
+
+@Test
+public void testTransitionFromNewTargetToAssigning() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(11, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+

[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-03 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1184402932


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3405,6 +3406,54 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
 }
 
+@Test
+public void testMakeIllegalTransitionFatal() {
+doInitTransactions();
+assertTrue(transactionManager.isTransactional());
+
+// Step 1: create a transaction.
+transactionManager.beginTransaction();
+assertTrue(transactionManager.hasOngoingTransaction());
+
+// Step 2: abort a transaction (wait for it to complete) and then 
verify that the transaction manager is
+// left in the READY state.
+TransactionalRequestResult abortResult = 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND);
+runUntil(abortResult::isCompleted);
+abortResult.await();
+assertTrue(abortResult.isSuccessful());
+assertFalse(transactionManager.hasOngoingTransaction());
+assertTrue(transactionManager.isReady());
+
+// Step 3: create a batch and simulate the Sender handling a failed 
batch, which would *attempt* to put
+// the transaction manager in the ABORTABLE_ERROR state. However, that 
is an illegal state transition, so
+// verify that it failed and caused the transaction manager to be put 
in an unrecoverable FATAL_ERROR state.
+ProducerBatch batch = batchWithValue(tp0, "test");
+assertThrowsFatalStateException("handleFailedBatch", () -> 
transactionManager.handleFailedBatch(batch, new NetworkException("Disconnected 
from node 4"), false));
+assertTrue(transactionManager.hasFatalError());
+
+// Step 4: validate that the transactions can't be started, committed
+assertThrowsFatalStateException("beginTransaction", () -> 
transactionManager.beginTransaction());
+assertThrowsFatalStateException("beginAbort", () -> 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND));
+assertThrowsFatalStateException("beginCommit", () -> 
transactionManager.beginCommit());
+assertThrowsFatalStateException("maybeAddPartition", () -> 
transactionManager.maybeAddPartition(tp0));
+assertThrowsFatalStateException("initializeTransactions", () -> 
transactionManager.initializeTransactions());
+assertThrowsFatalStateException("sendOffsetsToTransaction", () -> 
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("fake-group-id")));
+}
+
+private void assertThrowsFatalStateException(String methodName, Runnable 
operation) {
+try {
+operation.run();
+} catch (KafkaException t) {

Review Comment:
   @jolshan Based on the API contract defined in the Javadoc for the 
transactional methods in `KafkaProducer`, it seems like I should add a special 
case to `maybeFailWithError()` for handling when `lastError` is of type 
`IllegalStateException`. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-03 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1184402932


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3405,6 +3406,54 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
 }
 
+@Test
+public void testMakeIllegalTransitionFatal() {
+doInitTransactions();
+assertTrue(transactionManager.isTransactional());
+
+// Step 1: create a transaction.
+transactionManager.beginTransaction();
+assertTrue(transactionManager.hasOngoingTransaction());
+
+// Step 2: abort a transaction (wait for it to complete) and then 
verify that the transaction manager is
+// left in the READY state.
+TransactionalRequestResult abortResult = 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND);
+runUntil(abortResult::isCompleted);
+abortResult.await();
+assertTrue(abortResult.isSuccessful());
+assertFalse(transactionManager.hasOngoingTransaction());
+assertTrue(transactionManager.isReady());
+
+// Step 3: create a batch and simulate the Sender handling a failed 
batch, which would *attempt* to put
+// the transaction manager in the ABORTABLE_ERROR state. However, that 
is an illegal state transition, so
+// verify that it failed and caused the transaction manager to be put 
in an unrecoverable FATAL_ERROR state.
+ProducerBatch batch = batchWithValue(tp0, "test");
+assertThrowsFatalStateException("handleFailedBatch", () -> 
transactionManager.handleFailedBatch(batch, new NetworkException("Disconnected 
from node 4"), false));
+assertTrue(transactionManager.hasFatalError());
+
+// Step 4: validate that the transactions can't be started, committed
+assertThrowsFatalStateException("beginTransaction", () -> 
transactionManager.beginTransaction());
+assertThrowsFatalStateException("beginAbort", () -> 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND));
+assertThrowsFatalStateException("beginCommit", () -> 
transactionManager.beginCommit());
+assertThrowsFatalStateException("maybeAddPartition", () -> 
transactionManager.maybeAddPartition(tp0));
+assertThrowsFatalStateException("initializeTransactions", () -> 
transactionManager.initializeTransactions());
+assertThrowsFatalStateException("sendOffsetsToTransaction", () -> 
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("fake-group-id")));
+}
+
+private void assertThrowsFatalStateException(String methodName, Runnable 
operation) {
+try {
+operation.run();
+} catch (KafkaException t) {

Review Comment:
   @jolshan Based on the API contract for the transactional methods, it seems 
like I should add a special case to `maybeFailWithError()` for handling when 
`lastError` is of type `IllegalStateException`. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-03 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1184401971


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3405,6 +3406,54 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
 }
 
+@Test
+public void testMakeIllegalTransitionFatal() {
+doInitTransactions();
+assertTrue(transactionManager.isTransactional());
+
+// Step 1: create a transaction.
+transactionManager.beginTransaction();
+assertTrue(transactionManager.hasOngoingTransaction());
+
+// Step 2: abort a transaction (wait for it to complete) and then 
verify that the transaction manager is
+// left in the READY state.
+TransactionalRequestResult abortResult = 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND);
+runUntil(abortResult::isCompleted);
+abortResult.await();
+assertTrue(abortResult.isSuccessful());
+assertFalse(transactionManager.hasOngoingTransaction());
+assertTrue(transactionManager.isReady());
+
+// Step 3: create a batch and simulate the Sender handling a failed 
batch, which would *attempt* to put
+// the transaction manager in the ABORTABLE_ERROR state. However, that 
is an illegal state transition, so
+// verify that it failed and caused the transaction manager to be put 
in an unrecoverable FATAL_ERROR state.
+ProducerBatch batch = batchWithValue(tp0, "test");
+assertThrowsFatalStateException("handleFailedBatch", () -> 
transactionManager.handleFailedBatch(batch, new NetworkException("Disconnected 
from node 4"), false));
+assertTrue(transactionManager.hasFatalError());
+
+// Step 4: validate that the transactions can't be started, committed
+assertThrowsFatalStateException("beginTransaction", () -> 
transactionManager.beginTransaction());
+assertThrowsFatalStateException("beginAbort", () -> 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND));
+assertThrowsFatalStateException("beginCommit", () -> 
transactionManager.beginCommit());
+assertThrowsFatalStateException("maybeAddPartition", () -> 
transactionManager.maybeAddPartition(tp0));
+assertThrowsFatalStateException("initializeTransactions", () -> 
transactionManager.initializeTransactions());
+assertThrowsFatalStateException("sendOffsetsToTransaction", () -> 
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("fake-group-id")));
+}
+
+private void assertThrowsFatalStateException(String methodName, Runnable 
operation) {
+try {
+operation.run();
+} catch (KafkaException t) {

Review Comment:
   @jolshan No. Sadly, it's more convoluted than that  
   
   In the existing implementation, upon detection of the invalid transition in 
`transitionTo`, we throw an `IllegalStateException`.
   
   In this PR, upon detection of the invalid transition in `transitionTo`, we 
first "poison" the state variables and **then** throw an 
`IllegalStateException`. Then later calls to `maybeFailWithError()` will throw 
a `KafkaException` since `hasError()` is `true` and there's no special case 
handling for when `lastError` is an instance of `IllegalStateException` .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184401478


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+@Test
+public void testTransitionFromNewTargetToRevoke() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 3, 4, 5),
+mkTopicAssignment(topicId2, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.REVOKING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(10, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 3),
+mkTopicAssignment(topicId2, 6)
+), updatedMember.assignedPartitions());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2),
+mkTopicAssignment(topicId2, 4, 5)
+), updatedMember.partitionsPendingRevocation());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 4, 5),
+mkTopicAssignment(topicId2, 7, 8)
+), updatedMember.partitionsPendingAssignment());
+}
+
+@Test
+public void testTransitionFromNewTargetToAssigning() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(11, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+

[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184401478


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+@Test
+public void testTransitionFromNewTargetToRevoke() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 3, 4, 5),
+mkTopicAssignment(topicId2, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.REVOKING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(10, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 3),
+mkTopicAssignment(topicId2, 6)
+), updatedMember.assignedPartitions());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2),
+mkTopicAssignment(topicId2, 4, 5)
+), updatedMember.partitionsPendingRevocation());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 4, 5),
+mkTopicAssignment(topicId2, 7, 8)
+), updatedMember.partitionsPendingAssignment());
+}
+
+@Test
+public void testTransitionFromNewTargetToAssigning() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(11, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+

[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184383745


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+@Test
+public void testTransitionFromNewTargetToRevoke() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 3, 4, 5),
+mkTopicAssignment(topicId2, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.REVOKING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(10, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 3),
+mkTopicAssignment(topicId2, 6)
+), updatedMember.assignedPartitions());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2),
+mkTopicAssignment(topicId2, 4, 5)
+), updatedMember.partitionsPendingRevocation());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 4, 5),
+mkTopicAssignment(topicId2, 7, 8)
+), updatedMember.partitionsPendingAssignment());
+}
+
+@Test
+public void testTransitionFromNewTargetToAssigning() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(11, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+

[jira] [Commented] (KAFKA-14957) Default value for state.dir is confusing

2023-05-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719079#comment-17719079
 ] 

Matthias J. Sax commented on KAFKA-14957:
-

Ah. Thanks.

That's gonna be nasty to fix... This part of the docs is generated from the 
code... So it depends on the platform that does the build what ends up in the 
docs. Would require a larger change to generate it differently...

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Mickael Maison
>Priority: Minor
>  Labels: beginner, newbie
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184300900


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ 

[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184368057


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jeffkbkim commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184358012


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The previous epoch of the member when the state was updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:

Review Comment:
   maybe we can link the ConsumerGroupMember.MemberState docs to this?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The previous epoch of the member when the state was updated.
+ *
+ * - Assigned 

[GitHub] [kafka] cmccabe merged pull request #13551: MINOR: Allow tagged fields with version subset of flexible version range

2023-05-03 Thread via GitHub


cmccabe merged PR #13551:
URL: https://github.com/apache/kafka/pull/13551


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184323192


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ 

[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184321022


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ 

[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184317038


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ 

[GitHub] [kafka] jolshan commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-05-03 Thread via GitHub


jolshan commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1533772132

   Thanks Philip. The initialization ones come when there is a thread leak 
somewhere. I'm pretty convinced it wasn't your change, but just wanted to be 
extra safe. I will check the next build. Thanks for your patience.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184300900


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ 

[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184300900


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ 

[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-05-03 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1184293956


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
 }
 }
 
+@Test
+public void testClusterAuthorizationFailure() throws Exception {
+int maxBlockMs = 500;
+
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.CLUSTER_AUTHORIZATION_FAILED));
+Producer producer = kafkaProducer(configs, new 
StringSerializer(),
+new StringSerializer(), metadata, client, null, time);
+assertThrows(ClusterAuthorizationException.class, 
producer::initTransactions);
+
+// retry initTransactions after the ClusterAuthorizationException not 
being thrown
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+TestUtils.retryOnExceptionWithTimeout(1000, 100, 
producer::initTransactions);

Review Comment:
   Probably - it's just done very consistently in the tests  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-05-03 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1184283663


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
 }
 }
 
+@Test
+public void testClusterAuthorizationFailure() throws Exception {
+int maxBlockMs = 500;
+
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.CLUSTER_AUTHORIZATION_FAILED));
+Producer producer = kafkaProducer(configs, new 
StringSerializer(),
+new StringSerializer(), metadata, client, null, time);
+assertThrows(ClusterAuthorizationException.class, 
producer::initTransactions);
+
+// retry initTransactions after the ClusterAuthorizationException not 
being thrown
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+TestUtils.retryOnExceptionWithTimeout(1000, 100, 
producer::initTransactions);

Review Comment:
   Should we close the producer here? Just looking through the failed tests and 
wanted to close any gaps we may have.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14966) Extract reusable common logic from OffsetFetcher

2023-05-03 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-14966:
--

 Summary: Extract reusable common logic from OffsetFetcher
 Key: KAFKA-14966
 URL: https://issues.apache.org/jira/browse/KAFKA-14966
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Lianet Magrans


The OffsetFetcher is internally used by the KafkaConsumer to fetch offsets, 
validate and reset positions. 

For the new consumer based on a refactored threading model, similar 
functionality will be needed by the ListOffsetsRequestManager component. 

This task aims at identifying and extracting the OffsetFetcher functionality 
that can be reused by the new consumer implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14965) Introduce ListOffsetsRequestManager to integrate ListOffsetsRequests into new consumer threading refactor

2023-05-03 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-14965:
--

 Summary: Introduce ListOffsetsRequestManager to integrate 
ListOffsetsRequests into new consumer threading refactor
 Key: KAFKA-14965
 URL: https://issues.apache.org/jira/browse/KAFKA-14965
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


This task introduces new functionality for handling ListOffsetsRequests for the 
new consumer implementation, as part for the ongoing work for the consumer 
threading model refactor.

This task introduces a new class named {{ListOffsetsRequestManager, 
}}responsible of handling ListOffsets requests performed by the consumer to 
expose functionality like beginningOffsets, endOffsets and offsetsForTimes. 

The Offset{{{}Fetcher{}}} class is used internally by the {{KafkaConsumer}} to 
list offsets, so this task will be based on a refactored Offset{{{}Fetcher{}}}, 
 reusing the fetching logic as much as possible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #11096: Adding reviewers.py to help tag reviewers in commit message

2023-05-03 Thread via GitHub


jolshan commented on PR #11096:
URL: https://github.com/apache/kafka/pull/11096#issuecomment-1533709307

   @mumrah Can we please check builds before merging? The build caught the 
license issue. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14964) ClientQuotaMetadataManager should not suppress exceptions

2023-05-03 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14964:
-
Affects Version/s: 3.4.0
   3.5.0

> ClientQuotaMetadataManager should not suppress exceptions
> -
>
> Key: KAFKA-14964
> URL: https://issues.apache.org/jira/browse/KAFKA-14964
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.4.0, 3.5.0
>Reporter: David Arthur
>Priority: Major
>
> As MetadataLoader calls each MetadataPublisher upon receiving new records 
> from the controller, it surrounds the call with a try-catch block in order to 
> pass exceptions to a FaultHandler. The FaultHandler used by MetadataLoader is 
> essential for us to learn about metadata errors on the broker since it 
> increments the metadata loader error JMX metric.
> ClientQuotaMetadataManager is in the update path for ClientQuota metadata 
> updates and is capturing exceptions. This means validation errors (like 
> invalid quotas) will not be seen by the FaultHandler, and the JMX metric will 
> not get incremented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14964) ClientQuotaMetadataManager should not suppress exceptions

2023-05-03 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14964:
-
Component/s: kraft

> ClientQuotaMetadataManager should not suppress exceptions
> -
>
> Key: KAFKA-14964
> URL: https://issues.apache.org/jira/browse/KAFKA-14964
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: David Arthur
>Priority: Major
>
> As MetadataLoader calls each MetadataPublisher upon receiving new records 
> from the controller, it surrounds the call with a try-catch block in order to 
> pass exceptions to a FaultHandler. The FaultHandler used by MetadataLoader is 
> essential for us to learn about metadata errors on the broker since it 
> increments the metadata loader error JMX metric.
> ClientQuotaMetadataManager is in the update path for ClientQuota metadata 
> updates and is capturing exceptions. This means validation errors (like 
> invalid quotas) will not be seen by the FaultHandler, and the JMX metric will 
> not get incremented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14964) ClientQuotaMetadataManager should not suppress exceptions

2023-05-03 Thread David Arthur (Jira)
David Arthur created KAFKA-14964:


 Summary: ClientQuotaMetadataManager should not suppress exceptions
 Key: KAFKA-14964
 URL: https://issues.apache.org/jira/browse/KAFKA-14964
 Project: Kafka
  Issue Type: Bug
Reporter: David Arthur


As MetadataLoader calls each MetadataPublisher upon receiving new records from 
the controller, it surrounds the call with a try-catch block in order to pass 
exceptions to a FaultHandler. The FaultHandler used by MetadataLoader is 
essential for us to learn about metadata errors on the broker since it 
increments the metadata loader error JMX metric.

ClientQuotaMetadataManager is in the update path for ClientQuota metadata 
updates and is capturing exceptions. This means validation errors (like invalid 
quotas) will not be seen by the FaultHandler, and the JMX metric will not get 
incremented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-05-03 Thread via GitHub


hachikuji commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1184131575


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -637,17 +637,31 @@ class ReplicaManager(val config: KafkaConfig,
 if (isValidRequiredAcks(requiredAcks)) {
   val sTime = time.milliseconds
   
+  val transactionalProducerIds = mutable.HashSet[Long]()
   val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition) = 
 if (transactionStatePartition.isEmpty || 
!config.transactionPartitionVerificationEnable)
   (entriesPerPartition, Map.empty)
-else
+else {
   entriesPerPartition.partition { case (topicPartition, records) =>
-
getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+// Produce requests (only requests that require verification) 
should only have one batch per partition in "batches" but check all just to be 
safe.
+val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
+
transactionalBatches.map(_.producerId()).toSet.foreach(transactionalProducerIds.add(_))
+if (transactionalBatches.nonEmpty) {
+  
getPartitionOrException(topicPartition).hasOngoingTransaction(transactionalBatches.head.producerId)
+} else { 
+  // If there is no producer ID in the batches, no need to verify.
+  true
+}
   }
+}
+  // We should have exactly one producer ID for transactional records
+  if (transactionalProducerIds.size > 1) {
+throw new InvalidRecordException("Transactional records contained more 
than one producer ID")

Review Comment:
   Would it make sense to return `InvalidProducerIdMapping`?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -637,17 +637,31 @@ class ReplicaManager(val config: KafkaConfig,
 if (isValidRequiredAcks(requiredAcks)) {
   val sTime = time.milliseconds
   
+  val transactionalProducerIds = mutable.HashSet[Long]()
   val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition) = 
 if (transactionStatePartition.isEmpty || 
!config.transactionPartitionVerificationEnable)
   (entriesPerPartition, Map.empty)
-else
+else {
   entriesPerPartition.partition { case (topicPartition, records) =>
-
getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+// Produce requests (only requests that require verification) 
should only have one batch per partition in "batches" but check all just to be 
safe.
+val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
+
transactionalBatches.map(_.producerId()).toSet.foreach(transactionalProducerIds.add(_))

Review Comment:
   nit: is the `toSet` necessary? Maybe we can simplify:
   ```scala
   transactionalBatches.foreach(batch => 
transactionalProducerIds.add(batch.producerId))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-03 Thread via GitHub


philipnee commented on PR #13664:
URL: https://github.com/apache/kafka/pull/13664#issuecomment-1533526150

   Hey @machi1990 - I'll get someone to review this. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya opened a new pull request, #13670: KAFKA-14962: Trim whitespace from ACL configuration

2023-05-03 Thread via GitHub


divijvaidya opened a new pull request, #13670:
URL: https://github.com/apache/kafka/pull/13670

   Kafka's startup can fail (see the exception trace below) if there is a 
trailing or leading whitespace for a configuration value. This fix makes it 
more tolerant towards cases where a user might accidentally add a trailing or 
leading whitespace in ACL configuration.
   
   ```
   ERROR [KafkaServer id=3] Fatal error during KafkaServer startup. Prepare to 
shutdown (kafka.server.KafkaServer)
   
   java.lang.IllegalArgumentException: For input string: "true "
   
   at 
scala.collection.StringOps$.toBooleanImpl$extension(StringOps.scala:943)
   
   at 
kafka.security.authorizer.AclAuthorizer.$anonfun$configure$4(AclAuthorizer.scala:153)
 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on a diff in pull request #13669: MINOR: Fix producer Callback comment

2023-05-03 Thread via GitHub


machi1990 commented on code in PR #13669:
URL: https://github.com/apache/kafka/pull/13669#discussion_r1184074412


##
clients/src/main/java/org/apache/kafka/clients/producer/Callback.java:
##
@@ -36,7 +36,7 @@ public interface Callback {
  *  Non-Retriable exceptions (fatal, the message will 
never be sent):
  *  
  *  InvalidTopicException
- *  OffsetMetadataTooLargeException
+ *  OffsetMetadataTooLarge

Review Comment:
   good catch!
   
   A more general comment unrelated to the change, do you think we can link the 
exceptions classes? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #13668: KAFKA-14963; Use equals method with Uuid

2023-05-03 Thread via GitHub


jsancio merged PR #13668:
URL: https://github.com/apache/kafka/pull/13668


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13668: KAFKA-14963; Use equals method with Uuid

2023-05-03 Thread via GitHub


jsancio commented on PR #13668:
URL: https://github.com/apache/kafka/pull/13668#issuecomment-1533476771

   Uuid is an object so they need to be compared with the equals method and not 
the == operator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13668: KAFKA-14963; Use equals method with Uuid

2023-05-03 Thread via GitHub


jsancio commented on PR #13668:
URL: https://github.com/apache/kafka/pull/13668#issuecomment-1533473126

   Thanks for the reviews. The following command passes:
   ```bash
   $ ./gradlew metadata:test
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-03 Thread via GitHub


machi1990 commented on PR #13664:
URL: https://github.com/apache/kafka/pull/13664#issuecomment-1533468874

   Thanks for the review @philipnee Do you know which committer we can tag for 
this to be reviewed and then merged by them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14963) Incorrect partition count metrics for kraft controllers

2023-05-03 Thread Jira
José Armando García Sancio created KAFKA-14963:
--

 Summary: Incorrect partition count metrics for kraft controllers
 Key: KAFKA-14963
 URL: https://issues.apache.org/jira/browse/KAFKA-14963
 Project: Kafka
  Issue Type: Bug
  Components: controller, kraft
Affects Versions: 3.4.0
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio
 Fix For: 3.4.1


It is possible for the KRaft controller to report more partitions than are 
available in the cluster. This is because the following test fail against 3.4.0:
{code:java}
       @Test
      public void testPartitionCountDecreased() {
          ControllerMetrics metrics = new MockControllerMetrics();
          ControllerMetricsManager manager = new 
ControllerMetricsManager(metrics);          Uuid createTopicId = 
Uuid.randomUuid();
          Uuid createPartitionTopicId = new Uuid(
              createTopicId.getMostSignificantBits(),
              createTopicId.getLeastSignificantBits()
          );
          Uuid removeTopicId = new Uuid(createTopicId.getMostSignificantBits(), 
createTopicId.getLeastSignificantBits());
          manager.replay(topicRecord("test", createTopicId));
          manager.replay(partitionRecord(createPartitionTopicId, 0, 0, 
Arrays.asList(0, 1, 2)));
          manager.replay(partitionRecord(createPartitionTopicId, 1, 0, 
Arrays.asList(0, 1, 2)));
          manager.replay(removeTopicRecord(removeTopicId));
          assertEquals(0, metrics.globalPartitionCount());
      }
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on a diff in pull request #13668: MINOR; Use equals method with Uuid

2023-05-03 Thread via GitHub


jsancio commented on code in PR #13668:
URL: https://github.com/apache/kafka/pull/13668#discussion_r1184012897


##
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##
@@ -202,10 +202,10 @@ private void handleFencingChange(Integer brokerId, 
BrokerRegistrationFencingChan
 throw new IllegalArgumentException(String.format("Broker with id 
%s is not registered", brokerId));
 }
 
-if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+if (fencingChange.equals(BrokerRegistrationFencingChange.FENCE)) {
 fencedBrokers.add(brokerId);
 updateBrokerStateMetrics();
-} else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
+} else if 
(fencingChange.equals(BrokerRegistrationFencingChange.UNFENCE)) {

Review Comment:
   Yes. These cahnges are not needed.



##
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##
@@ -202,10 +202,10 @@ private void handleFencingChange(Integer brokerId, 
BrokerRegistrationFencingChan
 throw new IllegalArgumentException(String.format("Broker with id 
%s is not registered", brokerId));
 }
 
-if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+if (fencingChange.equals(BrokerRegistrationFencingChange.FENCE)) {
 fencedBrokers.add(brokerId);
 updateBrokerStateMetrics();
-} else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
+} else if 
(fencingChange.equals(BrokerRegistrationFencingChange.UNFENCE)) {

Review Comment:
   Yes. These changes are not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13668: MINOR; Use equals method with Uuid

2023-05-03 Thread via GitHub


ijuma commented on code in PR #13668:
URL: https://github.com/apache/kafka/pull/13668#discussion_r1183991722


##
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##
@@ -202,10 +202,10 @@ private void handleFencingChange(Integer brokerId, 
BrokerRegistrationFencingChan
 throw new IllegalArgumentException(String.format("Broker with id 
%s is not registered", brokerId));
 }
 
-if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+if (fencingChange.equals(BrokerRegistrationFencingChange.FENCE)) {
 fencedBrokers.add(brokerId);
 updateBrokerStateMetrics();
-} else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
+} else if 
(fencingChange.equals(BrokerRegistrationFencingChange.UNFENCE)) {

Review Comment:
   This and the line above are not needed since they're enums, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13653: KAFKA-14946: fix NPE when merging the deltatable

2023-05-03 Thread via GitHub


cmccabe merged PR #13653:
URL: https://github.com/apache/kafka/pull/13653


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2023-05-03 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-14016.

Fix Version/s: 3.5.0
   3.4.1
 Assignee: Philip Nee
   Resolution: Fixed

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-rebalance-should-fix
> Fix For: 3.5.0, 3.4.1
>
> Attachments: CooperativeStickyAssignorBugReproduction.java
>
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # consumer A1-A10 (ten consumers) joined and synced group successfully with 
> generation 1 
>  # New consumer B1 joined and start a rebalance
>  # all consumer joined successfully and then A1 need to revoke partition to 
> transfer to B1
>  # A1 do a very quick syncGroup and re-join, because it revoked partition
>  # A2-A10 didn't send syncGroup before A1 re-join, so after the send 
> syncGruop, will get REBALANCE_IN_PROGRESS
>  # A2-A10 will revoke there partitions and re-join
> So in this rebalance almost every partition revoked, which highly decrease 
> the benefit of Cooperative rebalance 
> I think instead of "{*}resetStateAndRejoin{*} when 
> *RebalanceInProgressException* errors happend in {*}sync group{*}" we need 
> another way to fix it.
> Here is my proposal:
>  # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891
>  # In Server Coordinator handleSyncGroup when generationId checked and group 
> state is PreparingRebalance. We can send the assignment along with the error 
> code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the 
> generation first )
>  # When get the REBALANCE_IN_PROGRESS error in client, try to apply the 
> assignment first and then set the rejoinNeeded = true to make it re-join 
> immediately 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive

2023-05-03 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-13891.

Fix Version/s: 3.5.0
   3.4.1
   (was: 3.6.0)
   Resolution: Fixed

> sync group failed with rebalanceInProgress error cause rebalance many rounds 
> in coopeartive
> ---
>
> Key: KAFKA-13891
> URL: https://issues.apache.org/jira/browse/KAFKA-13891
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> This issue was first found in 
> [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419]
> But the previous PR forgot to reset generation when sync group failed with 
> rebalanceInProgress error. So the previous bug still exists and it may cause 
> consumer to rebalance many rounds before final stable.
> Here's the example ({*}bold is added{*}):
>  # consumer A joined and synced group successfully with generation 1 *( with 
> ownedPartition P1/P2 )*
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  # *After the generation-3 rebalance, consumer A got P3/P4 partition. the 
> ownedPartition is ignored because of old generation.*
>  # *consumer A revoke P1/P2 and re-join to start a new round of rebalance*
>  # *if some other consumer C failed to syncGroup before consumer A's 
> joinGroup. the same issue will happens again and result in many rounds of 
> rebalance before stable*
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive

2023-05-03 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718993#comment-17718993
 ] 

Philip Nee commented on KAFKA-13891:


Hey [~mimaison]  - this is actually the same issue as 
https://issues.apache.org/jira/browse/KAFKA-14639

 

I think the story is a bit complicated, but all these issues should be fixed by 
KAFKA-14639. So I think we can resolve them...

KAFKA-13891 (this)

KAFKA-14016

KAFKA-14639

> sync group failed with rebalanceInProgress error cause rebalance many rounds 
> in coopeartive
> ---
>
> Key: KAFKA-13891
> URL: https://issues.apache.org/jira/browse/KAFKA-13891
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.6.0
>
>
> This issue was first found in 
> [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419]
> But the previous PR forgot to reset generation when sync group failed with 
> rebalanceInProgress error. So the previous bug still exists and it may cause 
> consumer to rebalance many rounds before final stable.
> Here's the example ({*}bold is added{*}):
>  # consumer A joined and synced group successfully with generation 1 *( with 
> ownedPartition P1/P2 )*
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  # *After the generation-3 rebalance, consumer A got P3/P4 partition. the 
> ownedPartition is ignored because of old generation.*
>  # *consumer A revoke P1/P2 and re-join to start a new round of rebalance*
>  # *if some other consumer C failed to syncGroup before consumer A's 
> joinGroup. the same issue will happens again and result in many rounds of 
> rebalance before stable*
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-03 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1183960378


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test");
+} else {
+syncSend(producer, key, "test");
+}
+key++;
+sentRecords++;
  

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-03 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1183960307


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test");
+} else {
+syncSend(producer, key, "test");
+}
+key++;
+sentRecords++;
  

[GitHub] [kafka] jsancio commented on pull request #13668: MINOR; Use equals method with Uuid

2023-05-03 Thread via GitHub


jsancio commented on PR #13668:
URL: https://github.com/apache/kafka/pull/13668#issuecomment-1533380250

   @mumrah this test fails without this fix:
   ```
   Gradle Test Run :metadata:test > Gradle Test Executor 12 > 
ControllerMetricsManagerTest > testPartitionCountDecreased() FAILED
   org.opentest4j.AssertionFailedError: expected: <0> but was: <2>
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
   at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
   at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
   at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:527)
   at 
app//org.apache.kafka.controller.ControllerMetricsManagerTest.testPartitionCountDecreased(ControllerMetricsManagerTest.java:199)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #13668: MINOR; Use equals method with Uuid

2023-05-03 Thread via GitHub


jsancio opened a new pull request, #13668:
URL: https://github.com/apache/kafka/pull/13668

   Uuid is an object so they need to be compared with the equals method and not 
the == operator.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-03 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1183959056


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test");
+} else {
+syncSend(producer, key, "test");

Review Comment:
   Sure.



-- 
This is an automated message from the Apache 

[jira] [Created] (KAFKA-14962) Whitespace in ACL configuration causes Kafka startup to fail

2023-05-03 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-14962:


 Summary: Whitespace in ACL configuration causes Kafka startup to 
fail
 Key: KAFKA-14962
 URL: https://issues.apache.org/jira/browse/KAFKA-14962
 Project: Kafka
  Issue Type: Bug
Reporter: Divij Vaidya
Assignee: Divij Vaidya
 Fix For: 3.6.0


Kafka's startup can fail if there is a trailing or leading whitespace for a 
configuration value. This fix makes it more tolerant towards cases where a user 
might accidentally add a trailing or leading whitespace in ACL configuration.
{code:java}
ERROR [KafkaServer id=3] Fatal error during KafkaServer startup. Prepare to 
shutdown (kafka.server.KafkaServer)

java.lang.IllegalArgumentException: For input string: "true "

at scala.collection.StringOps$.toBooleanImpl$extension(StringOps.scala:943)

at 
kafka.security.authorizer.AclAuthorizer.$anonfun$configure$4(AclAuthorizer.scala:153)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-05-03 Thread via GitHub


philipnee commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1533354929

   There seem to be a bit of server related failures, but they are also 
irrelevant to this change I think:
   
   I believe JDK17 tests and most of the JDK11 tests passed.
   
   ```
   Build / JDK 11 and Scala 2.13 / testOffsetSyncsTopicsOnTarget() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   3m 24s
   Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
   1m 51s
   Build / JDK 8 and Scala 2.12 / testLogCleanerConfig(String).quorum=kraft – 
kafka.server.DynamicBrokerReconfigurationTest
   28s
   Build / JDK 8 and Scala 2.12 / [1] Type=ZK, Name=testDualWrite, 
MetadataVersion=3.4-IV0, Security=PLAINTEXT – 
kafka.zk.ZkMigrationIntegrationTest
   2m 7s
   Existing failures - 23
   Build / JDK 8 and Scala 2.12 / executionError – 
kafka.server.DynamicBrokerReconfigurationTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.EdgeCaseRequestTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.FetchRequestDownConversionConfigTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.FetchRequestTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.FetchRequestWithLegacyMessageFormatTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.FinalizedFeatureChangeListenerTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.GssapiAuthenticationTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.KafkaMetricReporterClusterIdTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.KafkaMetricReporterExceptionHandlingTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.KafkaMetricsReporterTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.KafkaServerTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.ListOffsetsRequestTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.LogDirFailureTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.LogRecoveryTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.MetadataRequestTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.MultipleListenersWithAdditionalJaasContextTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.OffsetFetchRequestTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.ServerGenerateClusterIdTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.ServerShutdownTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.UpdateFeaturesTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceWithIbp26Test
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.zk.KafkaZkClientTest
   <1s
   Build / JDK 8 and Scala 2.12 / initializationError – 
kafka.zk.ZkMigrationClientTest
   <1s
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183933777


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.

Review Comment:
   Will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183933309


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+   

[GitHub] [kafka] machi1990 commented on a diff in pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-03 Thread via GitHub


machi1990 commented on code in PR #13664:
URL: https://github.com/apache/kafka/pull/13664#discussion_r1183913810


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java:
##
@@ -89,11 +90,14 @@ public void setup() {
 }
 
 @Test
-public void testStartupAndTearDown() {
+public void testStartupAndTearDown() throws InterruptedException {
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
 DefaultBackgroundThread backgroundThread = mockBackgroundThread();
 backgroundThread.start();
-assertTrue(backgroundThread.isRunning());
+TestUtils.waitForCondition(backgroundThread::isRunning, "Failed 
awaiting for the background thread to be running");
 backgroundThread.close();
+assertFalse(backgroundThread.isRunning());

Review Comment:
   Thanks for the review @kirktrue 
   
   It is guaranteed that is running will be false when `close()` is called: 
https://github.com/apache/kafka/blob/6c3e82f69a081b760a685817332ea0a2f7a2a6a8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java#L230



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru opened a new pull request, #13667: KAFKA-12693: ALOS fencing

2023-05-03 Thread via GitHub


lucasbru opened a new pull request, #13667:
URL: https://github.com/apache/kafka/pull/13667

   When an instance (or thread within an instance) of Kafka Streams has a soft 
failure and the group coordinator triggers a rebalance, that instance would 
temporarily become a "zombie writer". That is, this instance does not know 
there's already a new rebalance and hence its partitions have been migrated 
out, until it tries to commit and then got notified of the illegal-generation 
error and realize itself is the "zombie" already. During this period until the 
commit, this zombie may still be writing data to the changelogs of the migrated 
tasks as the new owner has already taken over and also writing to the 
changelogs.
   
   When EOS is enabled, this would not be a problem: when the zombie tries to 
commit and got notified that it's fenced, its zombie appends would be aborted. 
With EOS disabled, though, such shared writes would be interleaved on the 
changelogs where a zombie append may arrive later after the new writer's 
append, effectively overwriting that new append.
   
   Note that such interleaving writes do not necessarily cause corrupted data: 
as long as the new producer keep appending after the old zombie stops, and all 
the corrupted keys are overwritten again by the new values, then it is fine. 
However, if there are consecutive rebalances where right after the changelogs 
are corrupted by zombie writers, and before the new writer can overwrite them 
again, the task gets migrated again and needs to be restored from changelogs, 
the old values would be restored instead of the new values, effectively causing 
data loss.
   
   This change adds a new producer configuration to enable registering a 
transaction ID, but without requiring the client to use any of the 
transactional operations. This enables using the transaction ID to fence the 
zombie producer, just as with EOS, but when in ALOS mode. The new configuration 
is used in the streams producer when ALOS is enabled.
   
   The change includes an ALOS integration test that reliably fails without the 
`transaction.id`, and reliably passes with the `transaction.id`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-05-03 Thread via GitHub


mimaison commented on PR #13122:
URL: https://github.com/apache/kafka/pull/13122#issuecomment-1533310873

   Thanks @clolov for the update. There's a few checkstyle failures in 
LogDirsCommandTest.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183889134


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.

Review Comment:
   Could we change the comment to something like "The epoch of the member when 
the state was last updated."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183887136


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ 

[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1183887136


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ 

[GitHub] [kafka] chia7712 commented on pull request #13659: MINOR: add docs to remind reader that impl of ConsumerPartitionAssign…

2023-05-03 Thread via GitHub


chia7712 commented on PR #13659:
URL: https://github.com/apache/kafka/pull/13659#issuecomment-1533287238

   @kirktrue thanks for feedback. will address it later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13659: MINOR: add docs to remind reader that impl of ConsumerPartitionAssign…

2023-05-03 Thread via GitHub


kirktrue commented on PR #13659:
URL: https://github.com/apache/kafka/pull/13659#issuecomment-1533285689

   This looks good to me.
   
   I looked briefly at the code that calls 
[`configure()`](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java#L307).
 But I was a little surprised when I ran the 
[`ConsumerPartitionAssignorTest`](https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java)
 with code coverage enabled and those particular tests aren't actually testing 
that functionality.

   Would you be up for adding a test or two to `ConsumerPartitionAssignorTest` 
to make sure that functionality is properly covered? If not, I'm happy to add 
it myself.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-05-03 Thread via GitHub


clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1183882730


##
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class LogDirsCommandTest {
+
+@Test
+public void shouldThrowWhenQueryingNonExistentBrokers() {
+Node broker = new Node(1, "hostname", 9092);
+try (MockAdminClient adminClient = new 
MockAdminClient(Collections.singletonList(broker), broker)) {
+assertThrows(RuntimeException.class, () -> 
execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", 
"0,1,2", "--describe"), adminClient));
+}
+}
+
+@Test
+public void shouldNotThrowWhenDuplicatedBrokers() throws 
JsonProcessingException {
+Node broker = new Node(1, "hostname", 9092);
+try (MockAdminClient adminClient = new 
MockAdminClient(Collections.singletonList(broker), broker)) {
+String standardOutput = 
execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", 
"1,1", "--describe"), adminClient);
+String[] standardOutputLines = standardOutput.split("\n");
+assertEquals(3, standardOutputLines.length);
+@SuppressWarnings("unchecked")
+Map information = new 
ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
+@SuppressWarnings("unchecked")
+List brokerInformation = (List) 
information.get("brokers");
+@SuppressWarnings("unchecked")
+Integer brokerId = (Integer) ((HashMap) 
brokerInformation.get(0)).get("broker");
+assertEquals(1, brokerInformation.size());
+assertEquals(1, brokerId);
+}
+}
+
+@Test
+public void shouldQueryAllBrokersIfNonSpecified() throws 
JsonProcessingException {
+Node brokerOne = new Node(1, "hostname", 9092);
+Node brokerTwo = new Node(2, "hostname", 9092);
+try (MockAdminClient adminClient = new 
MockAdminClient(Arrays.asList(brokerOne, brokerTwo), brokerOne)) {
+String standardOutput = 
execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--describe"), 
adminClient);
+String[] standardOutputLines = standardOutput.split("\n");
+assertEquals(3, standardOutputLines.length);
+@SuppressWarnings("unchecked")
+Map information = new 
ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
+@SuppressWarnings("unchecked")
+List brokerInformation = (List) 
information.get("brokers");
+@SuppressWarnings("unchecked")
+Integer brokerOneId = (Integer) ((HashMap) 
brokerInformation.get(0)).get("broker");
+@SuppressWarnings("unchecked")
+Integer brokerTwoId = (Integer) ((HashMap) 
brokerInformation.get(1)).get("broker");

Review Comment:
   You are absolutely correct, order is not guaranteed. In the newest commit I 
check that the elements are present in a set. Thank you for caching this!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-05-03 Thread via GitHub


clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1183881591


##
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class LogDirsCommandTest {
+
+@Test
+public void shouldThrowWhenQueryingNonExistentBrokers() {
+Node broker = new Node(1, "hostname", 9092);
+try (MockAdminClient adminClient = new 
MockAdminClient(Collections.singletonList(broker), broker)) {
+assertThrows(RuntimeException.class, () -> 
execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", 
"0,1,2", "--describe"), adminClient));
+}
+}
+
+@Test
+public void shouldNotThrowWhenDuplicatedBrokers() throws 
JsonProcessingException {
+Node broker = new Node(1, "hostname", 9092);
+try (MockAdminClient adminClient = new 
MockAdminClient(Collections.singletonList(broker), broker)) {
+String standardOutput = 
execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", 
"1,1", "--describe"), adminClient);
+String[] standardOutputLines = standardOutput.split("\n");
+assertEquals(3, standardOutputLines.length);
+@SuppressWarnings("unchecked")

Review Comment:
   Okay, moved it to the top of the methods!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-03 Thread via GitHub


kirktrue commented on code in PR #13664:
URL: https://github.com/apache/kafka/pull/13664#discussion_r1183863358


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java:
##
@@ -89,11 +90,14 @@ public void setup() {
 }
 
 @Test
-public void testStartupAndTearDown() {
+public void testStartupAndTearDown() throws InterruptedException {
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
 DefaultBackgroundThread backgroundThread = mockBackgroundThread();
 backgroundThread.start();
-assertTrue(backgroundThread.isRunning());
+TestUtils.waitForCondition(backgroundThread::isRunning, "Failed 
awaiting for the background thread to be running");
 backgroundThread.close();
+assertFalse(backgroundThread.isRunning());

Review Comment:
   Is it possible that `isRunning()` could take some time to return `false` or 
is it guaranteed to be set by the time `close()` is finished?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-05-03 Thread via GitHub


kirktrue commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1183846777


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link 
EventAccumulator}}
+ * which guarantees that events sharing a partition key are not processed 
concurrently.
+ */
+public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+
+/**
+ * The logger.
+ */
+private final Logger log;
+
+/**
+ * The accumulator.
+ */
+private final EventAccumulator accumulator;
+
+/**
+ * The processing threads.
+ */
+private final List threads;
+
+/**
+ * The lock for protecting access to the resources.
+ */
+private final ReentrantLock lock;
+
+/**
+ * A boolean indicated whether the event processor is shutting down.
+ */
+private volatile boolean shuttingDown;
+
+/**
+ * Constructor.
+ *
+ * @param logContextThe log context.
+ * @param threadPrefix  The thread prefix.
+ * @param numThreadsThe number of threads.
+ */
+public MultiThreadedEventProcessor(
+LogContext logContext,
+String threadPrefix,
+int numThreads
+) {
+this.log = logContext.logger(MultiThreadedEventProcessor.class);
+this.shuttingDown = false;
+this.lock = new ReentrantLock();
+this.accumulator = new EventAccumulator<>();
+this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
+new EventProcessorThread(threadPrefix + threadId)
+).collect(Collectors.toList());
+this.threads.forEach(EventProcessorThread::start);
+}
+
+/**
+ * The event processor thread. The thread pulls events from the
+ * accumulator and runs them.
+ */
+class EventProcessorThread extends ShutdownableThread {
+EventProcessorThread(
+String name
+) {
+super(name, false);
+}
+
+@Override
+public void doWork() {
+while (!shuttingDown) {

Review Comment:
   Should the `while` loop condition also include checking some of the methods 
in `ShutdownableThread`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**

[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-05-03 Thread via GitHub


sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1533239690

   Hi @vamossagar12,
   I can set that but actually `endoffsets` is getting populated with right 
values. The issue is with the way `lastConsumedOffset` is being used.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-12693) Consecutive rebalances with zombie instances may cause corrupted changelogs

2023-05-03 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-12693:
--

Assignee: Lucas Brutschy

> Consecutive rebalances with zombie instances may cause corrupted changelogs
> ---
>
> Key: KAFKA-12693
> URL: https://issues.apache.org/jira/browse/KAFKA-12693
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: new-streams-runtime-should-fix, streams
>
> When an instance (or thread within an instance) of Kafka Streams has a soft 
> failure and the group coordinator triggers a rebalance, that instance would 
> temporarily become a "zombie writer". That is, this instance does not know 
> there's already a new rebalance and hence its partitions have been migrated 
> out, until it tries to commit and then got notified of the illegal-generation 
> error and realize itself is the "zombie" already. During this period until 
> the commit, this zombie may still be writing data to the changelogs of the 
> migrated tasks as the new owner has already taken over and also writing to 
> the changelogs.
> When EOS is enabled, this would not be a problem: when the zombie tries to 
> commit and got notified that it's fenced, its zombie appends would be 
> aborted. With EOS disabled, though, such shared writes would be interleaved 
> on the changelogs where a zombie append may arrive later after the new 
> writer's append, effectively overwriting that new append.
> Note that such interleaving writes do not necessarily cause corrupted data: 
> as long as the new producer keep appending after the old zombie stops, and 
> all the corrupted keys are overwritten again by the new values, then it is 
> fine. However, if there are consecutive rebalances where right after the 
> changelogs are corrupted by zombie writers, and before the new writer can 
> overwrite them again, the task gets migrated again and needs to be restored 
> from changelogs, the old values would be restored instead of the new values, 
> effectively causing data loss.
> Although this should be a rare event, we should fix it asap still. One idea 
> is to have producers get a PID even under ALOS: that is, we set the 
> transactional id in the producer config, but did not trigger any txn APIs; 
> when there are zombie producers, they would then be immediately fenced on 
> appends and hence there's no interleaved appends. I think this may require a 
> KIP still, since today one has to call initTxn in order to register and get 
> the PID.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac opened a new pull request, #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-05-03 Thread via GitHub


dajac opened a new pull request, #13666:
URL: https://github.com/apache/kafka/pull/13666

   Adds CoordinatorEvent, CoordinatorEventProcessor, and 
MultiThreadedEventProcessor.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

2023-05-03 Thread via GitHub


divijvaidya commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1183658413


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -62,6 +65,39 @@ class LogCleanerTest {
 Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+try {
+  val logCleaner = new LogCleaner(new CleanerConfig(true),
+logDirs = Array(TestUtils.tempDir()),
+logs = new Pool[TopicPartition, UnifiedLog](),
+logDirFailureChannel = new LogDirFailureChannel(1),
+time = time)
+
+  // shutdown logCleaner so that metrics are removed
+  logCleaner.shutdown()
+
+  val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+  val numMetricsRegistered = 5
+  verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())

Review Comment:
   It is intentionally left as generic so that this can capture all `gauge` in 
this class. If someone adds a gauge and doesn't update this test, this line 
will fail. Hence, catching all generic strings makes this test even more 
secure. 
   
   We don't need to do something similar for remove because, we validate that 
the number of "added" is equal to the number of metrics names in the next 
statement. This ensures that all added metrics are in MetricNames and later we 
validate that all MetricNames are closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

2023-05-03 Thread via GitHub


divijvaidya commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1183654795


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -466,6 +472,17 @@ object LogCleaner {
   config.logCleanerEnable)
 
   }
+
+  private val MaxBufferUtilizationPercentMetricName = 
"max-buffer-utilization-percent"
+  private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
+  private val MaxCleanTimeMetricName = "max-clean-time-secs"
+  private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
+  private val DeadThreadCountMetricName = "DeadThreadCount"
+  private[log] val MetricNames = Set(MaxBufferUtilizationPercentMetricName,

Review Comment:
   fixed in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

2023-05-03 Thread via GitHub


divijvaidya commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1183654517


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -62,6 +65,39 @@ class LogCleanerTest {
 Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+try {
+  val logCleaner = new LogCleaner(new CleanerConfig(true),
+logDirs = Array(TestUtils.tempDir()),
+logs = new Pool[TopicPartition, UnifiedLog](),
+logDirFailureChannel = new LogDirFailureChannel(1),
+time = time)
+
+  // shutdown logCleaner so that metrics are removed
+  logCleaner.shutdown()
+
+  val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+  val numMetricsRegistered = 5
+  verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())
+
+  // verify that all metrics are added to the list of metric name
+  assertEquals(LogCleaner.MetricNames.size, numMetricsRegistered,
+"All metrics are not part of MetricNames collections")
+
+  // verify that each metric is removed
+  LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
+
+  // assert that we have verified all invocations on
+  verifyNoMoreInteractions(mockMetricsGroup)
+} finally {
+  if (mockMetricsGroupCtor != null) {

Review Comment:
   Removed in latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 opened a new pull request, #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

2023-05-03 Thread via GitHub


machi1990 opened a new pull request, #13665:
URL: https://github.com/apache/kafka/pull/13665

   Opening as a WIP as I need to look on adding more tests. I'll promote this 
once that's done.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-03 Thread via GitHub


machi1990 commented on PR #13664:
URL: https://github.com/apache/kafka/pull/13664#issuecomment-153294

   @philipnee can you've a look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on a diff in pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-03 Thread via GitHub


machi1990 commented on code in PR #13664:
URL: https://github.com/apache/kafka/pull/13664#discussion_r1183579541


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java:
##
@@ -89,11 +90,14 @@ public void setup() {
 }
 
 @Test
-public void testStartupAndTearDown() {
+public void testStartupAndTearDown() throws InterruptedException {
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());

Review Comment:
   These stubs call where missing thus causing the NPE as indicated in the JIRA



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on a diff in pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-03 Thread via GitHub


machi1990 commented on code in PR #13664:
URL: https://github.com/apache/kafka/pull/13664#discussion_r1183579038


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java:
##
@@ -89,11 +90,14 @@ public void setup() {
 }
 
 @Test
-public void testStartupAndTearDown() {
+public void testStartupAndTearDown() throws InterruptedException {
+
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
 DefaultBackgroundThread backgroundThread = mockBackgroundThread();
 backgroundThread.start();
-assertTrue(backgroundThread.isRunning());
+TestUtils.waitForCondition(backgroundThread::isRunning, "Failed 
awaiting for the background thread to be running");

Review Comment:
   This will wait up to 15s, the default timeout. It should be good enough!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 opened a new pull request, #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-03 Thread via GitHub


machi1990 opened a new pull request, #13664:
URL: https://github.com/apache/kafka/pull/13664

   1. Ensures that NPE are not thrown
   2. Ensures that the background thread has been started to avoid flasky 
assertions failures on isRunning
   3. Add a check that the thread is not running when closed
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14961) DefaultBackgroundThreadTest.testStartupAndTearDown test is flasky

2023-05-03 Thread Manyanda Chitimbo (Jira)
Manyanda Chitimbo created KAFKA-14961:
-

 Summary: DefaultBackgroundThreadTest.testStartupAndTearDown test 
is flasky
 Key: KAFKA-14961
 URL: https://issues.apache.org/jira/browse/KAFKA-14961
 Project: Kafka
  Issue Type: Test
Reporter: Manyanda Chitimbo
Assignee: Manyanda Chitimbo


When running the test suite locally I noticed the following error
{code:java}
org.opentest4j.AssertionFailedError: expected:  but was: 
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180)
at 
app//org.apache.kafka.clients.consumer.internals.DefaultBackgroundThreadTest.testStartupAndTearDown(DefaultBackgroundThreadTest.java:95)
 {code}
which happened only once and I could reproduce it again. 

I further noticed some NPE in debug logs in the form of
{code:java}
 ERROR The background thread failed due to unexpected error 
(org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread:166)
java.lang.NullPointerException
    at 
org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread.handlePollResult(DefaultBackgroundThread.java:200)
    at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
    at 
java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1675)
    at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at 
java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:553)
    at 
org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread.runOnce(DefaultBackgroundThread.java:187)
    at 
org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread.run(DefaultBackgroundThread.java:159)
 {code}
which is due to missing stubs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tinaselenge commented on a diff in pull request #13660: KAFKA-14662: Update the ACL list in the doc

2023-05-03 Thread via GitHub


tinaselenge commented on code in PR #13660:
URL: https://github.com/apache/kafka/pull/13660#discussion_r1183526354


##
docs/security.html:
##
@@ -2089,6 +2089,144 @@ https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java#L101).
  However now looking at the code again, I don't think ClusterAction operation 
is checked as part of the authorization check for them but used later for 
throttling the requests. So I think I should remove them, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge commented on a diff in pull request #13660: KAFKA-14662: Update the ACL list in the doc

2023-05-03 Thread via GitHub


tinaselenge commented on code in PR #13660:
URL: https://github.com/apache/kafka/pull/13660#discussion_r1183527261


##
docs/security.html:
##
@@ -2089,6 +2089,144 @@ 

[GitHub] [kafka] tinaselenge commented on a diff in pull request #13660: KAFKA-14662: Update the ACL list in the doc

2023-05-03 Thread via GitHub


tinaselenge commented on code in PR #13660:
URL: https://github.com/apache/kafka/pull/13660#discussion_r1183526354


##
docs/security.html:
##
@@ -2089,6 +2089,144 @@ https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java#L101).
  However now looking at them again, I'm not sure where ClusterAction operation 
is checked for them. I only see Describe or Alter operations being checked on 
Cluster resource. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-03 Thread via GitHub


cadonna commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183390548


##
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##
@@ -106,7 +106,11 @@ private static class ValueList {
 }
 
 int next() {
-return (index < values.length) ? values[index++] : -1;
+final int v = values[index++];
+if (index >= values.length) {
+index = 0;
+}

Review Comment:
   Doesn't this risk to bring a lot of disorder into the timestamps? I am 
referring to the comment on line 100. What are the consequences of such a 
disorder? 



##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'B'
+extra_properties['test.expected_agg_values'] = 'A,B'
+for p in self.processors[:-1]:
+self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)

Review Comment:
   I do not understand `from_version[:-2]` here. Doesn't this return a sublist? 



##
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java:
##
@@ -34,10 +34,34 @@ public class SmokeTestUtil {
 
 final static int END = Integer.MAX_VALUE;
 
+static ProcessorSupplier 
printTaskProcessorSupplier(final String topic) {
+return printTaskProcessorSupplier(topic, "");
+}
+
 static ProcessorSupplier 
printProcessorSupplier(final String topic) {
 return printProcessorSupplier(topic, "");
 }
 
+static ProcessorSupplier 
printTaskProcessorSupplier(final String topic, final String name) {

Review Comment:
   It seems the parameter `name` is not used.



##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)

Review Comment:
   Do we have any guarantee that the instance on the new version do actually 

[GitHub] [kafka] Hangleton commented on a diff in pull request #12331: KAFKA-1194: changes needed to run on Windows

2023-05-03 Thread via GitHub


Hangleton commented on code in PR #12331:
URL: https://github.com/apache/kafka/pull/12331#discussion_r1183474170


##
raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java:
##
@@ -139,16 +139,17 @@ private void writeElectionStateToFile(final File 
stateFile, QuorumStateData stat
 
 log.trace("Writing tmp quorum state {}", temp.getAbsolutePath());
 
-try (final FileOutputStream fileOutputStream = new 
FileOutputStream(temp);
- final BufferedWriter writer = new BufferedWriter(
- new OutputStreamWriter(fileOutputStream, 
StandardCharsets.UTF_8))) {
+final OpenOption[] options = {StandardOpenOption.WRITE,
+StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE};
+
+try (BufferedWriter writer = Files.newBufferedWriter(temp.toPath(), 
StandardCharsets.UTF_8, options)) {
 short version = state.highestSupportedVersion();
 
 ObjectNode jsonState = (ObjectNode) 
QuorumStateDataJsonConverter.write(state, version);
 jsonState.set(DATA_VERSION, new ShortNode(version));
 writer.write(jsonState.toString());
 writer.flush();
-fileOutputStream.getFD().sync();
+writer.close();

Review Comment:
   Note that the try-with-resource also invokes `close`.



##
server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java:
##
@@ -74,13 +74,28 @@ public CheckpointFile(File file,
 
 public void write(Collection entries) throws IOException {
 synchronized (lock) {
+final OpenOption[] options = {StandardOpenOption.WRITE,
+StandardOpenOption.CREATE, StandardOpenOption.SPARSE};
+
 // write to temp file and then swap with the existing file
-try (FileOutputStream fileOutputStream = new 
FileOutputStream(tempPath.toFile());
- BufferedWriter writer = new BufferedWriter(new 
OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
-CheckpointWriteBuffer checkpointWriteBuffer = new 
CheckpointWriteBuffer<>(writer, version, formatter);
-checkpointWriteBuffer.write(entries);
+
+try (BufferedWriter writer = Files.newBufferedWriter(tempPath, 
StandardCharsets.UTF_8, options)) {
+// Write the version
+writer.write(Integer.toString(version));
+writer.newLine();
+
+// Write the entries count
+writer.write(Integer.toString(entries.size()));
+writer.newLine();
+
+// Write each entry on a new line.
+for (T entry : entries) {
+writer.write(formatter.toString(entry));
+writer.newLine();
+}
+
 writer.flush();
-fileOutputStream.getFD().sync();
+writer.close();

Review Comment:
   Ditto.



##
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##
@@ -461,13 +460,11 @@ private static FileChannel openChannel(File file,
int initFileSize,
boolean preallocate) throws 
IOException {
 if (mutable) {
-if (fileAlreadyExists || !preallocate) {
+if (preallocate && !fileAlreadyExists) {

Review Comment:
   Why negate the condition and swap the statements from the if and else 
clauses?



##
server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java:
##
@@ -74,13 +74,28 @@ public CheckpointFile(File file,
 
 public void write(Collection entries) throws IOException {
 synchronized (lock) {
+final OpenOption[] options = {StandardOpenOption.WRITE,
+StandardOpenOption.CREATE, StandardOpenOption.SPARSE};

Review Comment:
   Should `SPARSE` always be set?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java:
##
@@ -86,27 +87,42 @@ public AbstractIndex(File file, long baseOffset, int 
maxIndexSize, boolean writa
 
 private void createAndAssignMmap() throws IOException {
 boolean newlyCreated = file.createNewFile();
-RandomAccessFile raf;
+FileChannel channel;
 if (writable)
-raf = new RandomAccessFile(file, "rw");
+channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, 
StandardOpenOption.WRITE, StandardOpenOption.SPARSE);
 else
-raf = new RandomAccessFile(file, "r");
+channel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
 
 try {
 /* pre-allocate the file if necessary */
 if (newlyCreated) {
 if (maxIndexSize < entrySize())
 throw new IllegalArgumentException("Invalid max index 
size: " + maxIndexSize);
-

[jira] [Updated] (KAFKA-14946) KRaft controller node shutting down while renouncing leadership

2023-05-03 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14946:
--
Fix Version/s: 3.4.1

> KRaft controller node shutting down while renouncing leadership
> ---
>
> Key: KAFKA-14946
> URL: https://issues.apache.org/jira/browse/KAFKA-14946
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Affects Versions: 3.3.1
>Reporter: Akshay Kumar
>Assignee: Luke Chen
>Priority: Critical
> Fix For: 3.5.0, 3.4.1
>
>
> * We are using the zookeeper less Kafka (kafka Kraft).
>  * The cluster is having 3 nodes.
>  * One of the nodes gets automatically shut down randomly.
>  * Checked the logs but didn't get the exact reason.
>  * Kafka version - 3.3.1
>  * Attaching the log files. 
>  * Time - 2023-04-21 16:28:23
> *state-change.log -*
> [https://drive.google.com/file/d/1eS-ShKlhGPsIJoybHndlhahJnucU8RWA/view?usp=share_link]
>  
> *server.log -*
> [https://drive.google.com/file/d/1Ov5wrQIqx2AS4J7ppFeHJaDySsfsK588/view?usp=share_link]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14946) KRaft controller node shutting down while renouncing leadership

2023-05-03 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14946:
--
Fix Version/s: 3.5.0

> KRaft controller node shutting down while renouncing leadership
> ---
>
> Key: KAFKA-14946
> URL: https://issues.apache.org/jira/browse/KAFKA-14946
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Affects Versions: 3.3.1
>Reporter: Akshay Kumar
>Assignee: Luke Chen
>Priority: Critical
> Fix For: 3.5.0
>
>
> * We are using the zookeeper less Kafka (kafka Kraft).
>  * The cluster is having 3 nodes.
>  * One of the nodes gets automatically shut down randomly.
>  * Checked the logs but didn't get the exact reason.
>  * Kafka version - 3.3.1
>  * Attaching the log files. 
>  * Time - 2023-04-21 16:28:23
> *state-change.log -*
> [https://drive.google.com/file/d/1eS-ShKlhGPsIJoybHndlhahJnucU8RWA/view?usp=share_link]
>  
> *server.log -*
> [https://drive.google.com/file/d/1Ov5wrQIqx2AS4J7ppFeHJaDySsfsK588/view?usp=share_link]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-03 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1183397975


##
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+
+public class RemoteLogReader implements Callable {
+
+private final Logger logger;
+private final RemoteStorageFetchInfo fetchInfo;
+private final RemoteLogManager rlm;
+private final Consumer callback;
+
+public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
+   RemoteLogManager rlm,
+   Consumer callback) {
+this.fetchInfo = fetchInfo;
+this.rlm = rlm;
+this.callback = callback;
+logger = new LogContext() {
+@Override
+public String logPrefix() {
+return "[" + Thread.currentThread().getName() + "]";
+}
+}.logger(RemoteLogReader.class);
+}
+
+@Override
+public Void call() {
+RemoteLogReadResult result;
+try {
+logger.debug("Reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);
+
+FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
+result = new RemoteLogReadResult(Optional.of(fetchDataInfo), 
Optional.empty());
+} catch (OffsetOutOfRangeException e) {
+result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
+} catch (Exception e) {
+logger.error("Error occurred while reading the remote data for 
{}", fetchInfo.topicPartition, e);
+result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
+}
+
+logger.debug("Finished reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);

Review Comment:
   Should we report of offset out of range when applicable?



##
core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static 

[GitHub] [kafka] vamossagar12 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-05-03 Thread via GitHub


vamossagar12 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1532658288

   Thanks @sambhav-jain-16 . So, one thing that I note is that the 
`readEndOffsets` method is using `IsolationLevel.READ_UNCOMMITTED` when reading 
the messages while the test uses `read_committed`. Ideally, read_uncommitted 
should yield more records than read_committed so this is a bit weird. Can you 
try setting `readEndOffsets` to use read_committed and see if that has the same 
behaviour as you described above?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14909) KRaft Controllers not setting ZkMigrationReady tagged field

2023-05-03 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14909:
--
Fix Version/s: (was: 3.4.1)

> KRaft Controllers not setting ZkMigrationReady tagged field
> ---
>
> Key: KAFKA-14909
> URL: https://issues.apache.org/jira/browse/KAFKA-14909
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: David Arthur
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.5.0
>
>
> When sending ApiVersionsResponse to other controllers, the KRaft controller 
> is not setting the ZkMigrationReady field. This means, we can't determine if 
> the full KRaft quorum has been properly configured for a migration before 
> triggering the migration.
> As a result, we could start the migration on controller A (which was properly 
> configured), then fail over to controller B (which was not properly 
> configured) and no longer be in dual-write mode.
> The fix is to properly set the ZkMigrationReady tagged field, and to make use 
> of it in KRaftMigrationDriver



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14957) Default value for state.dir is confusing

2023-05-03 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718841#comment-17718841
 ] 

Mickael Maison commented on KAFKA-14957:


The configuration docs on the website: 
https://kafka.apache.org/documentation/#streamsconfigs_state.dir

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Mickael Maison
>Priority: Minor
>  Labels: beginner, newbie
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive

2023-05-03 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-13891:
---
Fix Version/s: 3.6.0
   (was: 3.5.0)

> sync group failed with rebalanceInProgress error cause rebalance many rounds 
> in coopeartive
> ---
>
> Key: KAFKA-13891
> URL: https://issues.apache.org/jira/browse/KAFKA-13891
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.6.0
>
>
> This issue was first found in 
> [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419]
> But the previous PR forgot to reset generation when sync group failed with 
> rebalanceInProgress error. So the previous bug still exists and it may cause 
> consumer to rebalance many rounds before final stable.
> Here's the example ({*}bold is added{*}):
>  # consumer A joined and synced group successfully with generation 1 *( with 
> ownedPartition P1/P2 )*
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  # *After the generation-3 rebalance, consumer A got P3/P4 partition. the 
> ownedPartition is ignored because of old generation.*
>  # *consumer A revoke P1/P2 and re-join to start a new round of rebalance*
>  # *if some other consumer C failed to syncGroup before consumer A's 
> joinGroup. the same issue will happens again and result in many rounds of 
> rebalance before stable*
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive

2023-05-03 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718826#comment-17718826
 ] 

Mickael Maison commented on KAFKA-13891:


We are past code freeze for 3.5 so moving this to the next release.

> sync group failed with rebalanceInProgress error cause rebalance many rounds 
> in coopeartive
> ---
>
> Key: KAFKA-13891
> URL: https://issues.apache.org/jira/browse/KAFKA-13891
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0
>
>
> This issue was first found in 
> [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419]
> But the previous PR forgot to reset generation when sync group failed with 
> rebalanceInProgress error. So the previous bug still exists and it may cause 
> consumer to rebalance many rounds before final stable.
> Here's the example ({*}bold is added{*}):
>  # consumer A joined and synced group successfully with generation 1 *( with 
> ownedPartition P1/P2 )*
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  # *After the generation-3 rebalance, consumer A got P3/P4 partition. the 
> ownedPartition is ignored because of old generation.*
>  # *consumer A revoke P1/P2 and re-join to start a new round of rebalance*
>  # *if some other consumer C failed to syncGroup before consumer A's 
> joinGroup. the same issue will happens again and result in many rounds of 
> rebalance before stable*
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2023-05-03 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718824#comment-17718824
 ] 

Mickael Maison commented on KAFKA-13421:


We are past code freeze for 3.5 so moving to the next release.

> Fix 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> -
>
> Key: KAFKA-13421
> URL: https://issues.apache.org/jira/browse/KAFKA-13421
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Philip Nee
>Priority: Blocker
> Fix For: 3.5.0
>
>
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
>  is failing with this error:
> {code}
> ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup()
>  failed, log available in 
> /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout
>   
>   
> ConsumerBounceTest > 
> testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() 
> FAILED
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode 
> = NodeExists
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126) 
>
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
>  
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
> at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:320)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2
> 12)
> at 
> scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
> at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
> at 
> kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203)
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB
> igGroup$1(ConsumerBounceTest.scala:327)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C
> onsumerBounceTest.scala:319) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2023-05-03 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-13421:
---
Fix Version/s: 3.6.0
   (was: 3.5.0)

> Fix 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> -
>
> Key: KAFKA-13421
> URL: https://issues.apache.org/jira/browse/KAFKA-13421
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Philip Nee
>Priority: Blocker
> Fix For: 3.6.0
>
>
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
>  is failing with this error:
> {code}
> ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup()
>  failed, log available in 
> /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout
>   
>   
> ConsumerBounceTest > 
> testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() 
> FAILED
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode 
> = NodeExists
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126) 
>
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
>  
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
> at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:320)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2
> 12)
> at 
> scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
> at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
> at 
> kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203)
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB
> igGroup$1(ConsumerBounceTest.scala:327)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C
> onsumerBounceTest.scala:319) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12319) Flaky test ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()

2023-05-03 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-12319:
---
Fix Version/s: 3.6.0
   (was: 3.5.0)

> Flaky test 
> ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
> -
>
> Key: KAFKA-12319
> URL: https://issues.apache.org/jira/browse/KAFKA-12319
> Project: Kafka
>  Issue Type: Test
>Reporter: Justine Olshan
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.6.0
>
>
> I've seen this test fail a few times locally. But recently I saw it fail on a 
> PR build on Jenkins.
>  
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10041/7/testReport/junit/kafka.network/ConnectionQuotasTest/Build___JDK_11___testListenerConnectionRateLimitWhenActualRateAboveLimit__/]
> h3. Error Message
> java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: 
> Expected rate (30 +- 7), but got 37.436825357209706 (600 connections / 16.027 
> sec) ==> expected: <30.0> but was: <37.436825357209706>
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12319) Flaky test ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()

2023-05-03 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718823#comment-17718823
 ] 

Mickael Maison commented on KAFKA-12319:


We are past code freeze for 3.5 so moving this to the next release.

> Flaky test 
> ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
> -
>
> Key: KAFKA-12319
> URL: https://issues.apache.org/jira/browse/KAFKA-12319
> Project: Kafka
>  Issue Type: Test
>Reporter: Justine Olshan
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.5.0
>
>
> I've seen this test fail a few times locally. But recently I saw it fail on a 
> PR build on Jenkins.
>  
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10041/7/testReport/junit/kafka.network/ConnectionQuotasTest/Build___JDK_11___testListenerConnectionRateLimitWhenActualRateAboveLimit__/]
> h3. Error Message
> java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: 
> Expected rate (30 +- 7), but got 37.436825357209706 (600 connections / 16.027 
> sec) ==> expected: <30.0> but was: <37.436825357209706>
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >