[jira] [Assigned] (KAFKA-16660) reduce the check interval to speedup DelegationTokenRequestsTest

2024-05-03 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16660:
--

Assignee: Chia-Ping Tsai

> reduce the check interval to speedup DelegationTokenRequestsTest
> 
>
> Key: KAFKA-16660
> URL: https://issues.apache.org/jira/browse/KAFKA-16660
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> the check interval is 1 minute 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L49),
>  and `DelegationTokenRequestsTest` wait 2 minutes before running the check 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L159)
>  ...
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16660) reduce the check interval to speedup DelegationTokenRequestsTest

2024-05-03 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16660:
--

 Summary: reduce the check interval to speedup 
DelegationTokenRequestsTest
 Key: KAFKA-16660
 URL: https://issues.apache.org/jira/browse/KAFKA-16660
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai


the check interval is 1 minute 
(https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L49),
 and `DelegationTokenRequestsTest` wait 2 minutes before running the check 
(https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L159)
 ...

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16660) reduce the check interval to speedup DelegationTokenRequestsTest

2024-05-03 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16660:
---
Description: 
the check interval is 1 minute 
(https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L49),
 and `DelegationTokenRequestsTest` waits 2 minutes before running the check 
(https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L159)
 ...

 

  was:
the check interval is 1 minute 
(https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L49),
 and `DelegationTokenRequestsTest` wait 2 minutes before running the check 
(https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L159)
 ...

 


> reduce the check interval to speedup DelegationTokenRequestsTest
> 
>
> Key: KAFKA-16660
> URL: https://issues.apache.org/jira/browse/KAFKA-16660
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> the check interval is 1 minute 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L49),
>  and `DelegationTokenRequestsTest` waits 2 minutes before running the check 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L159)
>  ...
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16661) add a lower `log.initial.task.delay.ms` value to integration test framework

2024-05-03 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16661:
-

 Summary: add a lower `log.initial.task.delay.ms` value to 
integration test framework
 Key: KAFKA-16661
 URL: https://issues.apache.org/jira/browse/KAFKA-16661
 Project: Kafka
  Issue Type: Test
Reporter: Luke Chen


After KAFKA-16552, we created an internal config `log.initial.task.delay.ms` to 
control the initial task delay in log manager. This ticket follows it up, to 
set a default low value (100ms, 500ms maybe?) for it, to speed up the tests.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: ConsumerGroup#getOrMaybeCreateMember should not add the member to the group [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15847:
URL: https://github.com/apache/kafka/pull/15847#discussion_r1588894836


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -252,14 +252,16 @@ public TargetAssignmentResult build() throws 
PartitionAssignorException {
 if (updatedMemberOrNull == null) {
 memberSpecs.remove(memberId);
 } else {
-ConsumerGroupMember member = members.get(memberId);
-Assignment assignment;
+Assignment assignment = 
targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+
 // A new static member joins and needs to replace an existing 
departed one.
-if (member == null && 
staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
-assignment = 
targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()),
 Assignment.EMPTY);
-} else {
-assignment = targetAssignment.getOrDefault(memberId, 
Assignment.EMPTY);
+if (updatedMemberOrNull.instanceId() != null) {
+String previousMemberId = 
staticMembers.get(updatedMemberOrNull.instanceId());
+if (previousMemberId != null) {
+assignment = 
targetAssignment.getOrDefault(updatedMemberOrNull.instanceId(), 
Assignment.EMPTY);

Review Comment:
   Good catch! The unit actually worked because it uses the same id for both 
the member id and the static id. I have updated updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: ConsumerGroup#getOrMaybeCreateMember should not add the member to the group [kafka]

2024-05-03 Thread via GitHub


dajac commented on PR #15847:
URL: https://github.com/apache/kafka/pull/15847#issuecomment-2092538851

   @dongnuo123 @chia7712 Thanks for your comments. I have addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-05-03 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2092565347

   Thanks @C0urante , good catch, yeah those are missing. I have modified some 
of the tests to consider all the 3 types of offset records. I added another 
test for the case when write to secondary store times out for regular offsets. 
Let me know if these look ok coverage-wise. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Accept only one task per exception in queue for failed tasks [kafka]

2024-05-03 Thread via GitHub


cadonna merged PR #15849:
URL: https://github.com/apache/kafka/pull/15849


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Accept only one task per exception in queue for failed tasks [kafka]

2024-05-03 Thread via GitHub


cadonna commented on PR #15849:
URL: https://github.com/apache/kafka/pull/15849#issuecomment-2092582390

   Failures are unrelated!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16174) Flaky test: testDescribeQuorumStatusSuccessful – org.apache.kafka.tools.MetadataQuorumCommandTest

2024-05-03 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843155#comment-17843155
 ] 

Johnny Hsu commented on KAFKA-16174:


[~apoorvmittal10] may I know if you are working on this ticket? if not I am 
willing to help :) 

> Flaky test: testDescribeQuorumStatusSuccessful – 
> org.apache.kafka.tools.MetadataQuorumCommandTest
> -
>
> Key: KAFKA-16174
> URL: https://issues.apache.org/jira/browse/KAFKA-16174
> Project: Kafka
>  Issue Type: Test
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: flaky-test
>
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15190/3/tests/]
>  
> {code:java}
> Errorjava.util.concurrent.ExecutionException: java.lang.RuntimeException: 
> Received a fatal error while waiting for the controller to acknowledge that 
> we are caught upStacktracejava.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Received a fatal error while waiting for the 
> controller to acknowledge that we are caught up at 
> java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:421)  
> at 
> kafka.test.junit.RaftClusterInvocationContext.lambda$getAdditionalExtensions$5(RaftClusterInvocationContext.java:116)
> at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeTestExecutionCallbacks$5(TestMethodTestDescriptor.java:192)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:203)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:203)
>at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeTestExecutionCallbacks(TestMethodTestDescriptor.java:191)
>at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
>at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
> at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
>at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-03 Thread via GitHub


lucasbru commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2092616162

   Is there any problem if we leave 
`awaitPendingAsyncCommitsAndExecuteCommitCallbacks`? It clearly needs the timer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14401: Fail kafka log read end requests if underneath work thread is dead [kafka]

2024-05-03 Thread via GitHub


vamossagar12 commented on PR #14372:
URL: https://github.com/apache/kafka/pull/14372#issuecomment-2092628110

   hey @C0urante , would you be able to review this PR? This seems to affect 
users from time to time and the effects are pretty bad eventually leading to 
OOM on the affected worker if the workers aren't restarted. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-03 Thread via GitHub


showuon commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1588939433


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig {
 "less than or equal to `log.retention.bytes` value.";
 public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.copy.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes 
that can be copied from local storage to remote storage per second. " +
+"This is a global limit for all the partitions that are being 
copied from remote storage to local storage. " +

Review Comment:
   `This is a global limit for all the partitions that are being copied from 
remote storage to local storage.` <-- is it right? Copied from local storage to 
remote storage?



##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+private final RLMQuotaManagerConfig config;
+private final Metrics metrics;
+private final QuotaType quotaType;
+private final String description;
+private final Time time;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final SensorAccess sensorAccess;
+private Quota quota;
+
+public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+this.config = config;
+this.metrics = metrics;
+this.quotaType = quotaType;
+this.description = description;
+this.time = time;
+
+this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+this.sensorAccess = new SensorAccess(lock, metrics);
+}
+
+public void updateQuota(Quota newQuota) {
+lock.writeLock().lock();
+try {
+this.quota = newQuota;
+
+Map allMetrics = metrics.metrics();
+MetricName quotaMetricName = metricName();
+KafkaMetric metric = allMetrics.get(quotaMetricName);
+if (metric != null) {
+LOGGER.warn("Sensor for quota-id {} already exists. Setting 
quota to {} in MetricConfig", quotaMetricName, newQuota);

Review Comment:
   I'd like to know why we set WARN logs here. It looks to me if we want to 
update quota dynamically, it is expected the metric is already existed, right? 
If so, I don't think this is unexpected. So maybe INFO or DEBUG level, WDYT?



##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *ht

Re: [PR] [1/N] ConfigCommandTest rewritten in java [kafka]

2024-05-03 Thread via GitHub


nizhikov commented on PR #15850:
URL: https://github.com/apache/kafka/pull/15850#issuecomment-2092651643

   CI looks OK for me. PR ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-03 Thread via GitHub


showuon commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1588971961


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+"Remote segment epochs: {} and partition leader 
epochs: {}",
+segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
+return false;
+}
+// There can be overlapping remote log segments in the remote storage. 
(eg)
+// leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)}
+// segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 
15)}
+// segment2: offset-range = 14-150, Broker = 1, epochs = {(5, 14), (7, 
15), (9, 100)}, after leader-election.

Review Comment:
   OK, the Broker1 having segment size different from broker0 could be because 
the Broker1 has the larger `log.segment.bytes` than Broker0. That makes sense. 
It's quite rare to see, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16660) reduce the check interval to speedup DelegationTokenRequestsTest

2024-05-03 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843171#comment-17843171
 ] 

Kuan Po Tseng commented on KAFKA-16660:
---

I'm willing to work on this item ! May I take over this ? :)

> reduce the check interval to speedup DelegationTokenRequestsTest
> 
>
> Key: KAFKA-16660
> URL: https://issues.apache.org/jira/browse/KAFKA-16660
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> the check interval is 1 minute 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L49),
>  and `DelegationTokenRequestsTest` waits 2 minutes before running the check 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L159)
>  ...
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16660) reduce the check interval to speedup DelegationTokenRequestsTest

2024-05-03 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16660:
--

Assignee: Kuan Po Tseng  (was: Chia-Ping Tsai)

> reduce the check interval to speedup DelegationTokenRequestsTest
> 
>
> Key: KAFKA-16660
> URL: https://issues.apache.org/jira/browse/KAFKA-16660
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
>
> the check interval is 1 minute 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L49),
>  and `DelegationTokenRequestsTest` waits 2 minutes before running the check 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L159)
>  ...
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-05-03 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843174#comment-17843174
 ] 

Luke Chen commented on KAFKA-16511:
---

[~ckamal] , I was reading the tickets and your PR, and I still think this issue 
is because the log start offset has incremented in the local 
leader-epoch-checkpoint, but the remote storage is stored the old one. What I 
can see is:

The log segment in the remote storage:
{code:java}
startOffset=2971163, endOffset=2978396, segmentLeaderEpochs={7=2971163}{code}

Current local leader-epoch-checkpoint
{code:java}
$ cat leader-epoch-checkpoint 
0
2
7 2976337
21 2978397{code}

So, the start offset for epoch 7 is now 2976337 ( > 2971163 in remote storage).

 

And we expected the segment can be deleted in `isSegmentBreachByLogStartOffset` 
 [here 
|https://github.com/apache/kafka/blob/240243b91d69c2b65b5e456065fdcce90c121b04/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L903],
 but it won't:

 
{code:java}
Integer firstEpoch = leaderEpochEntries.firstKey();
shouldDeleteSegment = 
metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <= 
firstEpoch) && metadata.endOffset() < logStartOffset;{code}
 

The firstEpoch is 7, and the 
`metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <= 
firstEpoch)` will be true, but `metadata.endOffset() < logStartOffset` will be 
false (metadata.endOffset(): 2978396 , logStartOffset: 2976337 )

 

That is, this segment is not deleted in isSegmentBreachByLogStartOffset, and 
the latter `isRemoteSegmentWithinLeaderEpochs` will also return false for this 
segment. So, this segment is kept in the remote storage.

 

I think this segment should not be deleted by isSegmentBreachByLogStartOffset 
because the logEndOffset of that segment is still greater than logStartOffset. 
So, we should be able to let this segment be deleted (or first accepted) in the 
following `isRemoteSegmentWithinLeaderEpochs`.

Does that make sense?

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029,

Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]

2024-05-03 Thread via GitHub


ashmeet13 commented on code in PR #12988:
URL: https://github.com/apache/kafka/pull/12988#discussion_r1589029328


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -1202,16 +1202,47 @@ public class StreamsConfig extends AbstractConfig {
 PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
 }
 
+private static final Map KS_DEFAULT_CONSUMER_CONFIGS;
+static {
+final Map tempConsumerDefaultOverrides = new 
HashMap<>();
+
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");
+
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+tempConsumerDefaultOverrides.put("internal.leave.group.on.close", 
false);
+
+KS_CONTROLLED_CONSUMER_CONFIGS = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);

Review Comment:
   `KS_DEFAULT_CONSUMER_CONFIGS` Is a map of defaults that Kafka Streams 
prefers. It does not lock these properties. The user can overwrite these 
configs. 
   
   `KS_CONTROLLED_CONSUMER_CONFIGS` and 
`KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED` are the configs we do not allow 
being overwritten. KS will overwrite the user configs in the end



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]

2024-05-03 Thread via GitHub


ashmeet13 commented on code in PR #12988:
URL: https://github.com/apache/kafka/pull/12988#discussion_r1589029328


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -1202,16 +1202,47 @@ public class StreamsConfig extends AbstractConfig {
 PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
 }
 
+private static final Map KS_DEFAULT_CONSUMER_CONFIGS;
+static {
+final Map tempConsumerDefaultOverrides = new 
HashMap<>();
+
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");
+
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+tempConsumerDefaultOverrides.put("internal.leave.group.on.close", 
false);
+
+KS_CONTROLLED_CONSUMER_CONFIGS = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);

Review Comment:
   `KS_DEFAULT_CONSUMER_CONFIGS` Is a map of defaults that Kafka Streams 
prefers. It does not lock these properties. The user can overwrite these 
configs. 
   
   `KS_CONTROLLED_CONSUMER_CONFIGS` and 
`KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED` are the configs we do not allow 
being overwritten. KS will overwrite the user configs if they exist to the 
values set in this map



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]

2024-05-03 Thread via GitHub


ashmeet13 commented on code in PR #12988:
URL: https://github.com/apache/kafka/pull/12988#discussion_r1589029328


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -1202,16 +1202,47 @@ public class StreamsConfig extends AbstractConfig {
 PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
 }
 
+private static final Map KS_DEFAULT_CONSUMER_CONFIGS;
+static {
+final Map tempConsumerDefaultOverrides = new 
HashMap<>();
+
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");
+
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+tempConsumerDefaultOverrides.put("internal.leave.group.on.close", 
false);
+
+KS_CONTROLLED_CONSUMER_CONFIGS = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);

Review Comment:
   `KS_DEFAULT_CONSUMER_CONFIGS` Is a map of defaults that Kafka Streams 
prefers. It does not lock these properties. The user can overwrite these 
configs. 
   
   `KS_CONTROLLED_CONSUMER_CONFIGS` and 
`KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED` are the configs we do not allow 
being overwritten. KS will overwrite the user configs if they exist to the 
values set in these two maps



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-05-03 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843185#comment-17843185
 ] 

Kamal Chandraprakash commented on KAFKA-16511:
--

[~showuon] 

Your analysis is correct. I tried handling this case in the PR. 

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}
> , startOffset=2971163, endOffset=2978396, brokerId=10001, 
> maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, 
> segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, 
> customMetadata=

[jira] [Created] (KAFKA-16662) UnwritableMetadataException: Metadata has been lost

2024-05-03 Thread Tobias Bohn (Jira)
Tobias Bohn created KAFKA-16662:
---

 Summary: UnwritableMetadataException: Metadata has been lost
 Key: KAFKA-16662
 URL: https://issues.apache.org/jira/browse/KAFKA-16662
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
 Environment: Docker Image (bitnami/kafka:3.7.0)
via Docker Compose
Reporter: Tobias Bohn
 Attachments: log.txt

Hello,
First of all: I am new to this Jira and apologize if anything is set or 
specified incorrectly. Feel free to advise me.

We currently have an error in our test system, which unfortunately I can't 
solve, because I couldn't find anything related to it. No solution could be 
found via the mailing list either.
The error occurs when we want to start up a node. The node runs using Kraft and 
is both a controller and a broker. The following error message appears at 
startup:
{code:java}
kafka  | [2024-04-16 06:18:13,707] ERROR Encountered fatal fault: Unhandled 
error initializing new publishers 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
kafka  | org.apache.kafka.image.writer.UnwritableMetadataException: Metadata 
has been lost because the following could not be represented in metadata 
version 3.5-IV2: the directory assignment state of one or more replicas
kafka  |        at 
org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
kafka  |        at 
org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
kafka  |        at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
kafka  |        at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
kafka  |        at 
org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
kafka  |        at 
org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
kafka  |        at 
org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
kafka  |        at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
kafka  |        at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
kafka  |        at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
kafka  |        at java.base/java.lang.Thread.run(Thread.java:840)
kafka exited with code 0 {code}
We use Docker to operate the cluster. The error occurred while we were trying 
to restart a node. All other nodes in the cluster are still running correctly.
If you need further information, please let us know. The complete log is 
attached to this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: ConsumerGroup#getOrMaybeCreateMember should not add the member to the group [kafka]

2024-05-03 Thread via GitHub


chia7712 commented on code in PR #15847:
URL: https://github.com/apache/kafka/pull/15847#discussion_r1589049952


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -182,14 +172,16 @@ public TargetAssignmentBuilder.TargetAssignmentResult 
build() {
 if (updatedMemberOrNull == null) {
 memberSpecs.remove(memberId);
 } else {
-ConsumerGroupMember member = members.get(memberId);
-Assignment assignment;
+Assignment assignment = 
targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+
 // A new static member joins and needs to replace an 
existing departed one.
-if (member == null && 
staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
-assignment = 
targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()),
 Assignment.EMPTY);
-} else {
-assignment = targetAssignment.getOrDefault(memberId, 
Assignment.EMPTY);
+if (updatedMemberOrNull.instanceId() != null) {

Review Comment:
   Pardon me, why the code in `TargetAssignmentBuilderTestContext#build` are 
similar to `TargetAssignmentBuilder#build` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16393 read/write sequence of buffers correctly [kafka]

2024-05-03 Thread via GitHub


chia7712 commented on PR #15571:
URL: https://github.com/apache/kafka/pull/15571#issuecomment-2092771734

   The QA terminate exceptionally. I have re-triggered the QA 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-03 Thread via GitHub


kamalcph commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1589055167


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+"Remote segment epochs: {} and partition leader 
epochs: {}",
+segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
+return false;
+}
+// There can be overlapping remote log segments in the remote storage. 
(eg)
+// leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)}
+// segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 
15)}
+// segment2: offset-range = 14-150, Broker = 1, epochs = {(5, 14), (7, 
15), (9, 100)}, after leader-election.

Review Comment:
   Given that we roll the log based on `log.roll.ms` and the server startup 
time can vary between the replicas, this is quite common.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1589061058


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10822,1146 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testJoiningConsumerGroupWithNewDynamicMember() throws 
Exception {
+String groupId = "group-id";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 2)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withSubscriptionMetadata(new HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+}
+})
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1)))
+   

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1589059959


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10822,1146 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testJoiningConsumerGroupWithNewDynamicMember() throws 
Exception {
+String groupId = "group-id";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 2)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withSubscriptionMetadata(new HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+}
+})
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1)))
+   

[PR] KAFKA-10199: Add remove operation with future to state updater [kafka]

2024-05-03 Thread via GitHub


cadonna opened a new pull request, #15852:
URL: https://github.com/apache/kafka/pull/15852

   Adds a remove operation to the state updater that returns a future instead 
of adding the removed tasks to an output queue. Code that uses the state 
updater can then wait on the future.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15835:
URL: https://github.com/apache/kafka/pull/15835#discussion_r1589068864


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -126,9 +126,16 @@ private class EventProcessorThread extends Thread {
 
 private void handleEvents() {
 while (!shuttingDown) {
-recordPollStartTime(time.milliseconds());
+// We use a single meter for aggregate idle percentage for the 
thread pool.
+// Since meter is calculated as total_recorded_value / 
time_window and
+// time_window is independent of the number of threads, each 
recorded idle
+// time should be discounted by # threads.
+
+long idleStartTimeMs = time.milliseconds();
 CoordinatorEvent event = accumulator.take();

Review Comment:
   Yeah, I agree. We should perhaps revert my commit that removed the timeout 
now that we have a use case for it.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -208,8 +209,8 @@ public void recordEventQueueTime(long durationMs) { }
 public void recordEventQueueProcessingTime(long durationMs) { }
 
 @Override
-public void recordThreadIdleRatio(double ratio) {
-threadIdleRatioSensor.record(ratio);
+public synchronized void recordThreadIdleTime(long idleTimeMs, long 
currentTimeMs) {

Review Comment:
   Why do we need to synchronize here?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -208,8 +209,8 @@ public void recordEventQueueTime(long durationMs) { }
 public void recordEventQueueProcessingTime(long durationMs) { }
 
 @Override
-public void recordThreadIdleRatio(double ratio) {
-threadIdleRatioSensor.record(ratio);
+public synchronized void recordThreadIdleTime(long idleTimeMs, long 
currentTimeMs) {
+threadIdleTimeRate.record(metrics.config(), idleTimeMs, currentTimeMs);

Review Comment:
   I am confused here. Shouldn't we call `record` on the sensor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [1/N] ConfigCommandTest rewritten in java [kafka]

2024-05-03 Thread via GitHub


chia7712 commented on code in PR #15850:
URL: https://github.com/apache/kafka/pull/15850#discussion_r1589072485


##
core/src/test/java/kafka/admin/JConfigCommandTest.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.config.ConfigType;
+import org.junit.jupiter.api.Test;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JConfigCommandTest {

Review Comment:
   How about rename it to `ConfigCommandUnitTest`



##
core/src/test/java/kafka/admin/JConfigCommandTest.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.config.ConfigType;
+import org.junit.jupiter.api.Test;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JConfigCommandTest {
+private static final String ZK_CONNECT = "localhost:2181";
+
+private static final List ZOOKEEPER_BOOTSTRAP = 
Arrays.asList("--zookeeper", ZK_CONNECT);
+private static final List BROKER_BOOTSTRAP = 
Arrays.asList("--bootstrap-server", "localhost:9092");
+private static final List CONTROLLER_BOOTSTRAP = 
Arrays.asList("--bootstrap-controller", "localhost:9093");
+
+@Test
+public void shouldExitWithNonZeroStatusOnArgError() {
+assertNonZeroStatusExit("--blah");
+}
+
+@Test
+public void shouldExitWithNonZeroStatusOnZkCommandWithTopicsEntity() {
+assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
+"--entity-type", "topics",
+"--describe")));
+}
+
+@Test
+public void shouldExitWithNonZeroStatusOnZkCommandWithClientsEntity() {
+assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
+"--entity-type", "clients",
+"--describe")));
+}
+
+@Test
+public void shouldExitWithNonZeroStatusOnZkCommandWithIpsEntity() {
+assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
+"--entity-type", "ips",
+"--describe")));
+}
+
+@Test
+public void shouldExitWithNonZeroStatusAlterUserQuotaWithoutEntityName() {
+assertNonZeroStatusExit(toArray(BROKER_BOOTSTRAP, Arrays.asList(
+"--entity-type", "users",
+"--alter", "--add-config", "consumer_byte_rate=2")));
+}
+
+@Test
+public void s

Re: [PR] MINOR: ConsumerGroup#getOrMaybeCreateMember should not add the member to the group [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15847:
URL: https://github.com/apache/kafka/pull/15847#discussion_r1589072912


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -182,14 +172,16 @@ public TargetAssignmentBuilder.TargetAssignmentResult 
build() {
 if (updatedMemberOrNull == null) {
 memberSpecs.remove(memberId);
 } else {
-ConsumerGroupMember member = members.get(memberId);
-Assignment assignment;
+Assignment assignment = 
targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+
 // A new static member joins and needs to replace an 
existing departed one.
-if (member == null && 
staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
-assignment = 
targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()),
 Assignment.EMPTY);
-} else {
-assignment = targetAssignment.getOrDefault(memberId, 
Assignment.EMPTY);
+if (updatedMemberOrNull.instanceId() != null) {

Review Comment:
   Yeah, I don't like it too even if I wrote it. 
`TargetAssignmentBuilderTestContext#build` basically generates the expected 
assignment spec that it should receive from `TargetAssignmentBuilder#build` so 
it uses the same logic. This is not ideal. I will try to refactor this in a 
separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-03 Thread via GitHub


cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1589007924


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CompletableEventReaperTest {
+
+private final LogContext logContext = new LogContext();
+private final Time time = new MockTime();
+private final CompletableEventReaper> 
reaper = new CompletableEventReaper<>(logContext);
+
+@Test
+public void testExpired() {
+// Add a new event to the reaper.
+Timer timer = time.timer(100);
+UnsubscribeEvent event = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
+reaper.add(event);
+
+// Without any time passing, we check the reaper and verify that the 
event is not done amd is still
+// being tracked.
+reaper.reapExpiredAndCompleted(time.milliseconds());
+assertFalse(event.future().isDone());
+assertEquals(1, reaper.size());
+
+// Sleep for at least 1 ms. *more* than the timeout so that the event 
is considered expired.
+time.sleep(timer.timeoutMs() + 1);
+timer.update(time.milliseconds());
+assertEquals(0, timer.remainingMs());
+
+// However, until we actually invoke the reaper, the event isn't 
complete and is still being tracked.
+assertFalse(event.future().isDone());
+assertEquals(1, reaper.size());
+
+// Call the reaper and validate that the event is now "done" 
(expired), the correct exception type is
+// thrown, and the event is no longer tracked.
+reaper.reapExpiredAndCompleted(time.milliseconds());
+assertTrue(event.future().isDone());
+assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event.future()));
+assertEquals(0, reaper.size());
+}
+
+@Test
+public void testCompleted() {
+// Add a new event to the reaper.
+Timer timer = time.timer(100);
+UnsubscribeEvent event = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
+reaper.add(event);
+
+// Without any time passing, we check the reaper and verify that the 
event is not done amd is still
+// being tracked.
+reaper.reapExpiredAndCompleted(time.milliseconds());
+assertFalse(event.future().isDone());
+assertEquals(1, reaper.size());
+
+// We'll cause the event to be completed normally. Note that because 
we haven't called the reaper, the
+// event is still being tracked.
+event.future().complete(null);
+assertTrue(event.future().isDone());
+assertEquals(1, reaper.size());
+
+// To ensure we don't accidentally expire an event that completed 
normally, sleep past the timeout.
+time.sleep(timer.timeoutMs() + 1);
+timer.update(time.milliseconds());
+assertEquals(0, timer.remainingMs());
+
+// Call the reaper and validate that the event is not considered 
expired, but is still no longer tracked.
+reaper.reapExpiredAndCompleted(time.milliseconds());
+assertTrue(event.future().isDone());
+assertNull

Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-03 Thread via GitHub


cadonna commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2092808466

   > Is there any problem if we leave 
`awaitPendingAsyncCommitsAndExecuteCommitCallbacks` as is? It clearly needs the 
timer
   
   Yeah, I think that we should leave this as it is for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16661) add a lower `log.initial.task.delay.ms` value to integration test framework

2024-05-03 Thread Chia Chuan Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia Chuan Yu reassigned KAFKA-16661:
-

Assignee: Chia Chuan Yu

> add a lower `log.initial.task.delay.ms` value to integration test framework
> ---
>
> Key: KAFKA-16661
> URL: https://issues.apache.org/jira/browse/KAFKA-16661
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Chia Chuan Yu
>Priority: Major
>  Labels: newbie, newbie++
>
> After KAFKA-16552, we created an internal config `log.initial.task.delay.ms` 
> to control the initial task delay in log manager. This ticket follows it up, 
> to set a default low value (100ms, 500ms maybe?) for it, to speed up the 
> tests.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1589109858


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,17 +1305,25 @@ private 
CoordinatorResult consumerGr
 }
 }
 
-// The subscription metadata is updated in two cases:
-// 1) The member has updated its subscriptions;
-// 2) The refresh deadline has been reached.
+int groupEpoch = group.groupEpoch();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
 Map subscribedTopicNamesMap = 
group.subscribedTopicNames();
+SubscriptionType subscriptionType = group.subscriptionType();
+
 if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.
 subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);
 subscriptionMetadata = group.computeSubscriptionMetadata(
 subscribedTopicNamesMap,
 metadataImage.topics(),
 metadataImage.cluster()
 );
+subscriptionType = ConsumerGroup.subscriptionType(
+subscribedTopicNamesMap,
+group.numMembers()

Review Comment:
   Once https://github.com/apache/kafka/pull/15847 is merged, I think that we 
can easily compute the numMembers as follow:
   
   ```
   int numMembers = group.numMembers();
   if (!group.hasMember(updatedMember.memberId() && !staticMemberReplaced) {
  numMembers++;
   }
   ```
   
   If the member did not exist and we are not replacing a static member, we 
increment by one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16174) Flaky test: testDescribeQuorumStatusSuccessful – org.apache.kafka.tools.MetadataQuorumCommandTest

2024-05-03 Thread Johnny Hsu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnny Hsu reassigned KAFKA-16174:
--

Assignee: Johnny Hsu

> Flaky test: testDescribeQuorumStatusSuccessful – 
> org.apache.kafka.tools.MetadataQuorumCommandTest
> -
>
> Key: KAFKA-16174
> URL: https://issues.apache.org/jira/browse/KAFKA-16174
> Project: Kafka
>  Issue Type: Test
>Reporter: Apoorv Mittal
>Assignee: Johnny Hsu
>Priority: Major
>  Labels: flaky-test
>
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15190/3/tests/]
>  
> {code:java}
> Errorjava.util.concurrent.ExecutionException: java.lang.RuntimeException: 
> Received a fatal error while waiting for the controller to acknowledge that 
> we are caught upStacktracejava.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Received a fatal error while waiting for the 
> controller to acknowledge that we are caught up at 
> java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:421)  
> at 
> kafka.test.junit.RaftClusterInvocationContext.lambda$getAdditionalExtensions$5(RaftClusterInvocationContext.java:116)
> at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeTestExecutionCallbacks$5(TestMethodTestDescriptor.java:192)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:203)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:203)
>at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeTestExecutionCallbacks(TestMethodTestDescriptor.java:191)
>at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
>at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
> at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
>at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-05-03 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843204#comment-17843204
 ] 

Luke Chen commented on KAFKA-16511:
---

OK, I'll review it again. Thanks.

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}
> , startOffset=2971163, endOffset=2978396, brokerId=10001, 
> maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, 
> segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-02T00:43:2

[jira] [Updated] (KAFKA-16595) Introduce ClusterTemplate in ClusterTests

2024-05-03 Thread Kuan Po Tseng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kuan Po Tseng updated KAFKA-16595:
--
Summary: Introduce ClusterTemplate in ClusterTests  (was: Introduce 
template in ClusterTests)

> Introduce ClusterTemplate in ClusterTests
> -
>
> Key: KAFKA-16595
> URL: https://issues.apache.org/jira/browse/KAFKA-16595
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> discussed in https://github.com/apache/kafka/pull/15761#discussion_r1573850549
> Currently we can't apply any template in ClusterTests, thus we have to write 
> down all ClusterConfigProperty in each ClusterTest inside ClusterTests. And 
> that could leave bunch of duplicate code. We need to find a way to reduce the 
> duplicate code. Introduce template in ClusterTests could be a solution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16659: KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER [kafka]

2024-05-03 Thread via GitHub


FrankYang0529 opened a new pull request, #15853:
URL: https://github.com/apache/kafka/pull/15853

   From KafkaConsumer#position comments, we should return WakeupException when 
KafkaConsumer#wakeup is called.
   
   
https://github.com/apache/kafka/blob/240243b91d69c2b65b5e456065fdcce90c121b04/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1228-L1252
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [1/N] ConfigCommandTest rewritten in java [kafka]

2024-05-03 Thread via GitHub


nizhikov commented on code in PR #15850:
URL: https://github.com/apache/kafka/pull/15850#discussion_r1589172610


##
core/src/test/java/kafka/admin/JConfigCommandTest.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.config.ConfigType;
+import org.junit.jupiter.api.Test;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JConfigCommandTest {

Review Comment:
   renamed



##
core/src/test/java/kafka/admin/JConfigCommandTest.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.config.ConfigType;
+import org.junit.jupiter.api.Test;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JConfigCommandTest {
+private static final String ZK_CONNECT = "localhost:2181";
+
+private static final List ZOOKEEPER_BOOTSTRAP = 
Arrays.asList("--zookeeper", ZK_CONNECT);
+private static final List BROKER_BOOTSTRAP = 
Arrays.asList("--bootstrap-server", "localhost:9092");
+private static final List CONTROLLER_BOOTSTRAP = 
Arrays.asList("--bootstrap-controller", "localhost:9093");
+
+@Test
+public void shouldExitWithNonZeroStatusOnArgError() {
+assertNonZeroStatusExit("--blah");
+}
+
+@Test
+public void shouldExitWithNonZeroStatusOnZkCommandWithTopicsEntity() {
+assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
+"--entity-type", "topics",
+"--describe")));
+}
+
+@Test
+public void shouldExitWithNonZeroStatusOnZkCommandWithClientsEntity() {
+assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
+"--entity-type", "clients",
+"--describe")));
+}
+
+@Test
+public void shouldExitWithNonZeroStatusOnZkCommandWithIpsEntity() {
+assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
+"--entity-type", "ips",
+"--describe")));
+}
+
+@Test
+public void shouldExitWithNonZeroStatusAlterUserQuotaWithoutEntityName() {
+assertNonZeroStatusExit(toArray(BROKER_BOOTSTRAP, Arrays.asList(
+"--entity-type", "users",
+"--alter", "--add-config", "consumer_byte_rate=2")));
+}
+
+@Test
+public void shouldExitWithNonZeroStatusOnBrokerComma

Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-03 Thread via GitHub


soarez commented on PR #15690:
URL: https://github.com/apache/kafka/pull/15690#issuecomment-2092980307

   Thanks for fixing the import. There are some failing tests, please have a 
look.
   
   I think we need to delete 
`kafka.server.KafkaConfigTest#testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage()`.
   There  is also at least another related test failure in `AlterLogDirTest`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509: [3/4] Add integration test for consumerGroupDescribe Api [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15727:
URL: https://github.com/apache/kafka/pull/15727#discussion_r1589172884


##
core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala:
##
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import kafka.server.GroupCoordinatorBaseRequestTest
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.TestUtils
+import org.apache.kafka.common.ConsumerGroupState
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData
+import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, 
DescribedGroup, TopicPartitions}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{Tag, Timeout}
+
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)

Review Comment:
   Hum... It should not work if the new protocol/coordinator is not enabled. 
See 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L3814.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16657) KIP-848 does not work well on Zookeeper Mode

2024-05-03 Thread sanghyeok An (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843212#comment-17843212
 ] 

sanghyeok An commented on KAFKA-16657:
--

[~schofielaj] Thanks for your comments. 

Is there a document where that is specified? I tried looking for it in this 
document, but it wasn't there

https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol

> KIP-848 does not work well on Zookeeper Mode
> 
>
> Key: KAFKA-16657
> URL: https://issues.apache.org/jira/browse/KAFKA-16657
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
>
> Hi, Kafka Team.
> I am testing the new rebalance protocol of KIP-848. It seems that the KIP-848 
> protocol works well in KRaft mode. However, KIP-848 protocol does not work 
> well in `Zookeeper` mode. 
>  
> I have created two versions of docker-compose files for Zookeeper Mode and 
> KRaft Mode. And I tested KIP-848 using the same consumer code and settings.
>  
> In KRaft Mode, the consumer received the assignment correctly. However, an 
> error occurred in Zookeeper Mode.
>  
> *Is KIP-848 supported in Zookeeper mode? or only KRaft is supported?* 
>  
> FYI, This is the code I used.
>  * ZK docker-compose: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose2/docker-compose.yaml
>  * ZK Result: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose2/README.md
>  * KRaft docker-compose:  
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose3/docker-compose.yaml
>  * KRaft Result: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose3/README.md
>  * Consumer code: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16470; kafka-dump-log --offsets-decoder should support new records [kafka]

2024-05-03 Thread via GitHub


chia7712 commented on code in PR #15652:
URL: https://github.com/apache/kafka/pull/15652#discussion_r1589164350


##
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##
@@ -399,6 +404,191 @@ class DumpLogSegmentsTest {
 assertEquals(partialBatches, partialBatchesCount)
   }
 
+  @Test
+  def testOffsetsMessageParser(): Unit = {
+val serde = new RecordSerde()
+val parser = new OffsetsMessageParser()
+
+def serializedRecord(key: ApiMessageAndVersion, value: 
ApiMessageAndVersion): Record = {
+  val record = new group.Record(key, value)
+  TestUtils.singletonRecord(key = serde.serializeKey(record), value = 
serde.serializeValue(record))

Review Comment:
   Maybe we can reuse the `singletonRecords`. for example:
   ```scala
   TestUtils.singletonRecords(key = serde.serializeKey(record), value = 
serde.serializeValue(record)).records().iterator().next()
   ```
   
   I try to avoid creating a huge/chaos `TestUtils`, Especially, we just 
cleanup `TestUtils` (#15808).



##
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##
@@ -398,9 +404,141 @@ object DumpLogSegments {
 }
   }
 
-  private class OffsetsMessageParser extends MessageParser[String, String] {
+  // Package private for testing.
+  class OffsetsMessageParser extends MessageParser[String, String] {
+private val serde = new RecordSerde()
+
+private def prepareKey(message: Message, version: Short): String = {
+  val messageAsJson = message match {
+case m: OffsetCommitKey =>
+  OffsetCommitKeyJsonConverter.write(m, version)
+case m: GroupMetadataKey =>
+  GroupMetadataKeyJsonConverter.write(m, version)
+case m: ConsumerGroupMetadataKey =>
+  ConsumerGroupMetadataKeyJsonConverter.write(m, version)
+case m: ConsumerGroupPartitionMetadataKey =>
+  ConsumerGroupPartitionMetadataKeyJsonConverter.write(m, version)
+case m: ConsumerGroupMemberMetadataKey =>
+  ConsumerGroupMemberMetadataKeyJsonConverter.write(m, version)
+case m: ConsumerGroupTargetAssignmentMetadataKey =>
+  ConsumerGroupTargetAssignmentMetadataKeyJsonConverter.write(m, 
version)
+case m: ConsumerGroupTargetAssignmentMemberKey =>
+  ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, version)
+case m: ConsumerGroupCurrentMemberAssignmentKey =>
+  ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, 
version)
+case _ => throw new UnknownRecordTypeException(version)
+  }
+
+  val json = new ObjectNode(JsonNodeFactory.instance)
+  json.set("type", new TextNode(version.toString))
+  json.set("data", messageAsJson)
+  json.toString
+}
+
+private def prepareGroupMetadataValue(message: GroupMetadataValue, 
version: Short): JsonNode = {
+  val json = GroupMetadataValueJsonConverter.write(message, version)
+
+  def replace[T](
+node: JsonNode,
+field: String,
+reader: (org.apache.kafka.common.protocol.Readable, Short) => T,
+writer: (T, Short) => JsonNode
+  ): Unit = {
+Option(node.get(field)).foreach { subscription =>

Review Comment:
   We use this function to convert "assignment" too, so the name `subscription` 
is a bit weird to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: ConsumerGroup#getOrMaybeCreateMember should not add the member to the group [kafka]

2024-05-03 Thread via GitHub


dajac merged PR #15847:
URL: https://github.com/apache/kafka/pull/15847


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1589199642


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,17 +1305,25 @@ private 
CoordinatorResult consumerGr
 }
 }
 
-// The subscription metadata is updated in two cases:
-// 1) The member has updated its subscriptions;
-// 2) The refresh deadline has been reached.
+int groupEpoch = group.groupEpoch();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
 Map subscribedTopicNamesMap = 
group.subscribedTopicNames();
+SubscriptionType subscriptionType = group.subscriptionType();
+
 if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.
 subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);
 subscriptionMetadata = group.computeSubscriptionMetadata(
 subscribedTopicNamesMap,
 metadataImage.topics(),
 metadataImage.cluster()
 );
+subscriptionType = ConsumerGroup.subscriptionType(
+subscribedTopicNamesMap,
+group.numMembers()

Review Comment:
   I just merged it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16470; kafka-dump-log --offsets-decoder should support new records [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15652:
URL: https://github.com/apache/kafka/pull/15652#discussion_r1589206306


##
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##
@@ -399,6 +404,191 @@ class DumpLogSegmentsTest {
 assertEquals(partialBatches, partialBatchesCount)
   }
 
+  @Test
+  def testOffsetsMessageParser(): Unit = {
+val serde = new RecordSerde()
+val parser = new OffsetsMessageParser()
+
+def serializedRecord(key: ApiMessageAndVersion, value: 
ApiMessageAndVersion): Record = {
+  val record = new group.Record(key, value)
+  TestUtils.singletonRecord(key = serde.serializeKey(record), value = 
serde.serializeValue(record))

Review Comment:
   Works for me.



##
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##
@@ -398,9 +404,141 @@ object DumpLogSegments {
 }
   }
 
-  private class OffsetsMessageParser extends MessageParser[String, String] {
+  // Package private for testing.
+  class OffsetsMessageParser extends MessageParser[String, String] {
+private val serde = new RecordSerde()
+
+private def prepareKey(message: Message, version: Short): String = {
+  val messageAsJson = message match {
+case m: OffsetCommitKey =>
+  OffsetCommitKeyJsonConverter.write(m, version)
+case m: GroupMetadataKey =>
+  GroupMetadataKeyJsonConverter.write(m, version)
+case m: ConsumerGroupMetadataKey =>
+  ConsumerGroupMetadataKeyJsonConverter.write(m, version)
+case m: ConsumerGroupPartitionMetadataKey =>
+  ConsumerGroupPartitionMetadataKeyJsonConverter.write(m, version)
+case m: ConsumerGroupMemberMetadataKey =>
+  ConsumerGroupMemberMetadataKeyJsonConverter.write(m, version)
+case m: ConsumerGroupTargetAssignmentMetadataKey =>
+  ConsumerGroupTargetAssignmentMetadataKeyJsonConverter.write(m, 
version)
+case m: ConsumerGroupTargetAssignmentMemberKey =>
+  ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, version)
+case m: ConsumerGroupCurrentMemberAssignmentKey =>
+  ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, 
version)
+case _ => throw new UnknownRecordTypeException(version)
+  }
+
+  val json = new ObjectNode(JsonNodeFactory.instance)
+  json.set("type", new TextNode(version.toString))
+  json.set("data", messageAsJson)
+  json.toString
+}
+
+private def prepareGroupMetadataValue(message: GroupMetadataValue, 
version: Short): JsonNode = {
+  val json = GroupMetadataValueJsonConverter.write(message, version)
+
+  def replace[T](
+node: JsonNode,
+field: String,
+reader: (org.apache.kafka.common.protocol.Readable, Short) => T,
+writer: (T, Short) => JsonNode
+  ): Unit = {
+Option(node.get(field)).foreach { subscription =>

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16663) CoordinatorRuntime write timer tasks should be cancelled once HWM advances

2024-05-03 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-16663:


 Summary: CoordinatorRuntime write timer tasks should be cancelled 
once HWM advances
 Key: KAFKA-16663
 URL: https://issues.apache.org/jira/browse/KAFKA-16663
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim
Assignee: Jeff Kim


Otherwise, we pile up the number of timer tasks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14579) Move DumpLogSegments to tools

2024-05-03 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843219#comment-17843219
 ] 

Chia-Ping Tsai commented on KAFKA-14579:


After https://github.com/apache/kafka/pull/15652, `DumpLogSegments` only 
depends on `TransactionState`, `TransactionLogKey`, `TransactionLogValue`, and 
`Decoder`. They can be moved to  "transaction-coordinator" module. 
`TransactionState can be rewrite by java enum. Both `TransactionLogKey` and 
`TransactionLogValue` are generated code, so we need to enable 
"transaction-coordinator" to run generation. 

`Decoder` needs to be deprecated, so maybe we should file KIP for it first.





> Move DumpLogSegments to tools
> -
>
> Key: KAFKA-14579
> URL: https://issues.apache.org/jira/browse/KAFKA-14579
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Alexandre Dupriez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16663) CoordinatorRuntime write timer tasks should be cancelled once HWM advances

2024-05-03 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-16663:
-
Description: Otherwise, we pile up the number of timer tasks which are 
no-ops if replication was successful. They stay in memory for 15 seconds and as 
the rate of write increases, this may heavily impact memory usage.  (was: 
Otherwise, we pile up the number of timer tasks)

> CoordinatorRuntime write timer tasks should be cancelled once HWM advances
> --
>
> Key: KAFKA-16663
> URL: https://issues.apache.org/jira/browse/KAFKA-16663
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
>
> Otherwise, we pile up the number of timer tasks which are no-ops if 
> replication was successful. They stay in memory for 15 seconds and as the 
> rate of write increases, this may heavily impact memory usage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10199: Add remove operation with future to state updater [kafka]

2024-05-03 Thread via GitHub


lucasbru commented on code in PR #15852:
URL: https://github.com/apache/kafka/pull/15852#discussion_r1589251704


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -191,14 +193,20 @@ private void performActionsOnTasks() {
 tasksAndActionsLock.lock();
 try {
 for (final TaskAndAction taskAndAction : getTasksAndActions()) 
{
-final Action action = taskAndAction.getAction();
+final Action action = taskAndAction.action();
 switch (action) {
 case ADD:
-addTask(taskAndAction.getTask());
+addTask(taskAndAction.task());
 break;
 case REMOVE:
-removeTask(taskAndAction.getTaskId());
+if (taskAndAction.futureForRemove() == null) {

Review Comment:
   Do we need the variant without the future?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -496,6 +504,103 @@ private void addTask(final Task task) {
 }
 }
 
+private void removeTask(final TaskId taskId, final 
CompletableFuture future) {
+try {
+if (updatingTasks.containsKey(taskId)) {
+removeUpdatingTask(taskId, future);

Review Comment:
   Could all four `remove...Task` methods just return a boolean if successful?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##
@@ -103,6 +149,17 @@ public String toString() {
  */
 void remove(final TaskId taskId);
 
+/**
+ * Removes a task (active or standby) from the state updater.
+ *
+ * This method does not block until the removed task is removed from the 
state updater. But it returns a future on
+ * which processing can be blocked. The task to remove is removed from the 
updating tasks, paused tasks,
+ * restored tasks, or failed tasks.
+ *
+ * @param taskId ID of the task to remove
+ */
+CompletableFuture removeWithFuture(final TaskId taskId);

Review Comment:
   removeAsync?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16657) KIP-848 does not work well on Zookeeper Mode

2024-05-03 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843227#comment-17843227
 ] 

Andrew Schofield commented on KAFKA-16657:
--

I just checked the KIP and I don't think it says explicitly. However, I worked 
on the client for this KIP and it was certainly necessary to enable KRaft for 
all of the tests.

Having just created a ZK broker, I can confirm that a consumer which tries to 
connect with the new protocol fails with 
`org.apache.kafka.common.errors.UnsupportedVersionException`. For one thing, 
this KIP is expected to reach production readiness in AK 4.0 and that's KRaft 
only.

> KIP-848 does not work well on Zookeeper Mode
> 
>
> Key: KAFKA-16657
> URL: https://issues.apache.org/jira/browse/KAFKA-16657
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
>
> Hi, Kafka Team.
> I am testing the new rebalance protocol of KIP-848. It seems that the KIP-848 
> protocol works well in KRaft mode. However, KIP-848 protocol does not work 
> well in `Zookeeper` mode. 
>  
> I have created two versions of docker-compose files for Zookeeper Mode and 
> KRaft Mode. And I tested KIP-848 using the same consumer code and settings.
>  
> In KRaft Mode, the consumer received the assignment correctly. However, an 
> error occurred in Zookeeper Mode.
>  
> *Is KIP-848 supported in Zookeeper mode? or only KRaft is supported?* 
>  
> FYI, This is the code I used.
>  * ZK docker-compose: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose2/docker-compose.yaml
>  * ZK Result: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose2/README.md
>  * KRaft docker-compose:  
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose3/docker-compose.yaml
>  * KRaft Result: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose3/README.md
>  * Consumer code: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-03 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1589298707


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.consumer.group;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.util.Collections.singleton;
+
+class ConsumerGroupExecutor {
+
+private ConsumerGroupExecutor() {
+}
+
+static  AutoCloseable buildConsumers(int numberOfConsumers,
+boolean syncCommit,
+String topic,
+Supplier> 
consumerSupplier) {
+List> consumers = IntStream.range(0, 
numberOfConsumers)
+.mapToObj(ignored -> consumerSupplier.get())
+.collect(Collectors.toList());
+
+ExecutorService executor = 
Executors.newFixedThreadPool(consumers.size());
+AtomicBoolean closed = new AtomicBoolean(false);
+final AutoCloseable closeable = () -> releaseConsumers(closed, 
consumers, executor);
+try {
+executor.execute(() -> initConsumer(topic, syncCommit, consumers, 
closed));
+return closeable;
+} catch (Throwable e) {
+Utils.closeQuietly(closeable, "Release Consumer");
+throw e;
+}
+}
+
+private static  void releaseConsumers(AtomicBoolean closed, 
List> consumers, ExecutorService executor) throws 
InterruptedException {
+closed.set(true);
+consumers.forEach(KafkaConsumer::wakeup);
+executor.shutdown();
+executor.awaitTermination(1, TimeUnit.MINUTES);
+}
+
+private static  void initConsumer(String topic, boolean syncCommit, 
List> consumers, AtomicBoolean closed) {
+for (KafkaConsumer consumer : consumers) {

Review Comment:
   We want to make all consumers run concurrently. This makes "one" thread run 
"all" consumers



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,382 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+i

Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-03 Thread via GitHub


gaurav-narula commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589305094


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+  val sessionIdRange = Int.MaxValue / config.numIoThreads

Review Comment:
   > Out of curiosity are you planning later to make this constant a 
configuration?
   
   I think it's unlikely one would need to tweak this often. My experiment ran 
on a fairly busy cluster with ~50k Fetch rps 
(`kafka.network-RequestMetrics-RequestsPerSec(request=Fetch)`) on the broker 
which is reasonably high.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16657) KIP-848 does not work well on Zookeeper Mode

2024-05-03 Thread sanghyeok An (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843231#comment-17843231
 ] 

sanghyeok An commented on KAFKA-16657:
--

Thanks for conforming it!! 

Sorry to say, i'm not familiar with AK 4.0.

Could you give more context about AK? (A... Kafka?) 

> KIP-848 does not work well on Zookeeper Mode
> 
>
> Key: KAFKA-16657
> URL: https://issues.apache.org/jira/browse/KAFKA-16657
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
>
> Hi, Kafka Team.
> I am testing the new rebalance protocol of KIP-848. It seems that the KIP-848 
> protocol works well in KRaft mode. However, KIP-848 protocol does not work 
> well in `Zookeeper` mode. 
>  
> I have created two versions of docker-compose files for Zookeeper Mode and 
> KRaft Mode. And I tested KIP-848 using the same consumer code and settings.
>  
> In KRaft Mode, the consumer received the assignment correctly. However, an 
> error occurred in Zookeeper Mode.
>  
> *Is KIP-848 supported in Zookeeper mode? or only KRaft is supported?* 
>  
> FYI, This is the code I used.
>  * ZK docker-compose: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose2/docker-compose.yaml
>  * ZK Result: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose2/README.md
>  * KRaft docker-compose:  
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose3/docker-compose.yaml
>  * KRaft Result: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose3/README.md
>  * Consumer code: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-03 Thread via GitHub


frankvicky commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1589307637


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,382 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private final Iterable groupProtocols;
 
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+this.groupProtocols = cluster.config().serverProper

Re: [PR] KAFKA-15756: [3/3] Migrate testConsumptionWithBrokerFailures to run old protocol in new coordinator [kafka]

2024-05-03 Thread via GitHub


dongnuo123 closed pull request #15547: KAFKA-15756: [3/3] Migrate 
testConsumptionWithBrokerFailures to run old protocol in new coordinator
URL: https://github.com/apache/kafka/pull/15547


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16657) KIP-848 does not work well on Zookeeper Mode

2024-05-03 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843236#comment-17843236
 ] 

Andrew Schofield commented on KAFKA-16657:
--

Ah, sorry for the jargon :) Apache Kafka = AK.

The latest release is Apache Kafka 3.7. This release supports KRaft and 
ZooKeeper.

The release under development is Apache Kafka 3.8. This release supports KRaft 
and ZooKeeper, but it's the final release that supports ZooKeeper.

The following one is Apache Kafka 4.0. This release only has KRaft.

This is why some features only work with KRaft. Soon, Kafka will be KRaft only.

> KIP-848 does not work well on Zookeeper Mode
> 
>
> Key: KAFKA-16657
> URL: https://issues.apache.org/jira/browse/KAFKA-16657
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
>
> Hi, Kafka Team.
> I am testing the new rebalance protocol of KIP-848. It seems that the KIP-848 
> protocol works well in KRaft mode. However, KIP-848 protocol does not work 
> well in `Zookeeper` mode. 
>  
> I have created two versions of docker-compose files for Zookeeper Mode and 
> KRaft Mode. And I tested KIP-848 using the same consumer code and settings.
>  
> In KRaft Mode, the consumer received the assignment correctly. However, an 
> error occurred in Zookeeper Mode.
>  
> *Is KIP-848 supported in Zookeeper mode? or only KRaft is supported?* 
>  
> FYI, This is the code I used.
>  * ZK docker-compose: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose2/docker-compose.yaml
>  * ZK Result: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose2/README.md
>  * KRaft docker-compose:  
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose3/docker-compose.yaml
>  * KRaft Result: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose3/README.md
>  * Consumer code: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-03 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1589336191


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10822,1146 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testJoiningConsumerGroupWithNewDynamicMember() throws 
Exception {
+String groupId = "group-id";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 2)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withSubscriptionMetadata(new HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+}
+})
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1)))
+  

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-03 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1589336191


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10822,1146 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testJoiningConsumerGroupWithNewDynamicMember() throws 
Exception {
+String groupId = "group-id";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 2)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withSubscriptionMetadata(new HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+}
+})
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1)))
+  

Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-03 Thread via GitHub


gaurav-narula commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589339685


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -541,9 +541,17 @@ class KafkaServer(
 }.toMap
 }
 
-val fetchManager = new FetchManager(Time.SYSTEM,
-  new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+// The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+// for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+val sessionIdRange = Int.MaxValue / config.numIoThreads
+val fetchSessionCaches = Range(0, config.numIoThreads)

Review Comment:
   I've used `until` because I needed an exclusive Range



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]

2024-05-03 Thread via GitHub


jeffkbkim commented on code in PR #15835:
URL: https://github.com/apache/kafka/pull/15835#discussion_r1589342058


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -208,8 +209,8 @@ public void recordEventQueueTime(long durationMs) { }
 public void recordEventQueueProcessingTime(long durationMs) { }
 
 @Override
-public void recordThreadIdleRatio(double ratio) {
-threadIdleRatioSensor.record(ratio);
+public synchronized void recordThreadIdleTime(long idleTimeMs, long 
currentTimeMs) {
+threadIdleTimeRate.record(metrics.config(), idleTimeMs, currentTimeMs);

Review Comment:
   A sensor requires both "total" and "rate" metrics. Since the "total idle 
time" metric is more or less useless, I have removed it. But this requires us 
to directly instantiate the Rate object.
   
   The alternative would be to revert it to a Sensor and also create a "total" 
metric.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]

2024-05-03 Thread via GitHub


jeffkbkim commented on code in PR #15835:
URL: https://github.com/apache/kafka/pull/15835#discussion_r1589342890


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -208,8 +209,8 @@ public void recordEventQueueTime(long durationMs) { }
 public void recordEventQueueProcessingTime(long durationMs) { }
 
 @Override
-public void recordThreadIdleRatio(double ratio) {
-threadIdleRatioSensor.record(ratio);
+public synchronized void recordThreadIdleTime(long idleTimeMs, long 
currentTimeMs) {

Review Comment:
   See the other comment. It is because we removed the sensor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]

2024-05-03 Thread via GitHub


jeffkbkim commented on code in PR #15835:
URL: https://github.com/apache/kafka/pull/15835#discussion_r1589342058


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -208,8 +209,8 @@ public void recordEventQueueTime(long durationMs) { }
 public void recordEventQueueProcessingTime(long durationMs) { }
 
 @Override
-public void recordThreadIdleRatio(double ratio) {
-threadIdleRatioSensor.record(ratio);
+public synchronized void recordThreadIdleTime(long idleTimeMs, long 
currentTimeMs) {
+threadIdleTimeRate.record(metrics.config(), idleTimeMs, currentTimeMs);

Review Comment:
   A sensor requires both "total" and "rate" metrics for the Meter object. 
Since the "total idle time" metric is more or less useless, I have removed it. 
But this requires us to directly instantiate the Rate object.
   
   The alternative would be to revert it to a Sensor and also create a "total" 
metric.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16664) Re-add EventAccumulator.take(timeout)

2024-05-03 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-16664:


 Summary: Re-add EventAccumulator.take(timeout)
 Key: KAFKA-16664
 URL: https://issues.apache.org/jira/browse/KAFKA-16664
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim


[https://github.com/apache/kafka/pull/15835] should be used with a timeout in 
EventAccumulator#take. We added a commit to remove the timeout, we should 
revert it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16657) KIP-848 does not work well on Zookeeper Mode

2024-05-03 Thread sanghyeok An (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843244#comment-17843244
 ] 

sanghyeok An commented on KAFKA-16657:
--

Thanks for your comments. 

I totally understood (y)

> KIP-848 does not work well on Zookeeper Mode
> 
>
> Key: KAFKA-16657
> URL: https://issues.apache.org/jira/browse/KAFKA-16657
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
>
> Hi, Kafka Team.
> I am testing the new rebalance protocol of KIP-848. It seems that the KIP-848 
> protocol works well in KRaft mode. However, KIP-848 protocol does not work 
> well in `Zookeeper` mode. 
>  
> I have created two versions of docker-compose files for Zookeeper Mode and 
> KRaft Mode. And I tested KIP-848 using the same consumer code and settings.
>  
> In KRaft Mode, the consumer received the assignment correctly. However, an 
> error occurred in Zookeeper Mode.
>  
> *Is KIP-848 supported in Zookeeper mode? or only KRaft is supported?* 
>  
> FYI, This is the code I used.
>  * ZK docker-compose: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose2/docker-compose.yaml
>  * ZK Result: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose2/README.md
>  * KRaft docker-compose:  
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose3/docker-compose.yaml
>  * KRaft Result: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose3/README.md
>  * Consumer code: 
> https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]

2024-05-03 Thread via GitHub


jeffkbkim commented on code in PR #15835:
URL: https://github.com/apache/kafka/pull/15835#discussion_r1589348895


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -126,9 +126,16 @@ private class EventProcessorThread extends Thread {
 
 private void handleEvents() {
 while (!shuttingDown) {
-recordPollStartTime(time.milliseconds());
+// We use a single meter for aggregate idle percentage for the 
thread pool.
+// Since meter is calculated as total_recorded_value / 
time_window and
+// time_window is independent of the number of threads, each 
recorded idle
+// time should be discounted by # threads.
+
+long idleStartTimeMs = time.milliseconds();
 CoordinatorEvent event = accumulator.take();

Review Comment:
   Created https://issues.apache.org/jira/browse/KAFKA-16664 to track this. I 
can work on it after this is merged assuming they can be split



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Add remove operation with future to state updater [kafka]

2024-05-03 Thread via GitHub


cadonna commented on code in PR #15852:
URL: https://github.com/apache/kafka/pull/15852#discussion_r1589352418


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -191,14 +193,20 @@ private void performActionsOnTasks() {
 tasksAndActionsLock.lock();
 try {
 for (final TaskAndAction taskAndAction : getTasksAndActions()) 
{
-final Action action = taskAndAction.getAction();
+final Action action = taskAndAction.action();
 switch (action) {
 case ADD:
-addTask(taskAndAction.getTask());
+addTask(taskAndAction.task());
 break;
 case REMOVE:
-removeTask(taskAndAction.getTaskId());
+if (taskAndAction.futureForRemove() == null) {

Review Comment:
   No, we do not but for the sake of breaking down my gigantic PR, I need to 
temporary keep it until one of my future PRs makes it obsolete. It will be 
simply called `remove()` after that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Add remove operation with future to state updater [kafka]

2024-05-03 Thread via GitHub


cadonna commented on code in PR #15852:
URL: https://github.com/apache/kafka/pull/15852#discussion_r1589353331


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -496,6 +504,103 @@ private void addTask(final Task task) {
 }
 }
 
+private void removeTask(final TaskId taskId, final 
CompletableFuture future) {
+try {
+if (updatingTasks.containsKey(taskId)) {
+removeUpdatingTask(taskId, future);

Review Comment:
   I will try that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Add remove operation with future to state updater [kafka]

2024-05-03 Thread via GitHub


cadonna commented on code in PR #15852:
URL: https://github.com/apache/kafka/pull/15852#discussion_r1589354020


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##
@@ -103,6 +149,17 @@ public String toString() {
  */
 void remove(final TaskId taskId);
 
+/**
+ * Removes a task (active or standby) from the state updater.
+ *
+ * This method does not block until the removed task is removed from the 
state updater. But it returns a future on
+ * which processing can be blocked. The task to remove is removed from the 
updating tasks, paused tasks,
+ * restored tasks, or failed tasks.
+ *
+ * @param taskId ID of the task to remove
+ */
+CompletableFuture removeWithFuture(final TaskId taskId);

Review Comment:
   What about `remove()`? 🙂 
   See my comment above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-03 Thread via GitHub


AndrewJSchofield commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589348685


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -395,19 +396,27 @@ object FullFetchContext {
   * The fetch context for a full fetch request.
   *
   * @param time   The clock to use.
-  * @param cache  The fetch session cache.
+  * @param caches The fetch session cache shards.
   * @param reqMetadataThe request metadata.
   * @param fetchData  The partition data from the fetch request.
   * @param usesTopicIds   True if this session should use topic IDs.
   * @param isFromFollower True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
-   private val cache: FetchSessionCache,
+   private val caches: Seq[FetchSessionCache],

Review Comment:
   `cacheShards`?



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, 
id: Int) extends Compara
   *
   * @param maxEntries The maximum number of entries that can be in the cache.
   * @param evictionMs The minimum time that an entry must be unused in order 
to be evictable.
-  */
+  * @param sessionIdRange The number of sessionIds each cache shard handles. 
The range for a given shard is [Math.max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange).

Review Comment:
   I know the `[ , )` notation means what you intend for inclusive/exclusive 
bounds, but the presence of the parentheses makes it a bit hard to read I 
think. Maybe using >= and < would be clearer.



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, 
id: Int) extends Compara
   *
   * @param maxEntries The maximum number of entries that can be in the cache.
   * @param evictionMs The minimum time that an entry must be unused in order 
to be evictable.
-  */
+  * @param sessionIdRange The number of sessionIds each cache shard handles. 
The range for a given shard is [Math.max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange).
+  * @param shardNum Identifier for this shard.
+ */
 class FetchSessionCache(private val maxEntries: Int,

Review Comment:
   Maybe the class name ought to be `FetchSessionCache` shard too. The cache 
really is the whole set of shards.



##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+  val sessionIdRange = Int.MaxValue / config.numIoThreads

Review Comment:
   I would tend not to add a configuration for this. The value you're talking 
about sounds like it's doing the job on a busy workload, and it's small enough 
that there's negligible benefit of configuring it smaller for a tiny cluster. 
Having a configuration kind of crystallizes this aspect of the internal design 
of Kafka, and you might have an even better idea in the future that would make 
this configuration pointless.



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -789,7 +807,15 @@ class FetchSessionCache(private val maxEntries: Int,
 }
 
 class FetchManager(private val time: Time,
-   private val cache: FetchSessionCache) extends Logging {
+   private val caches: Seq[FetchSessionCache]) extends Logging 
{
+
+  def this(time: Time, cache: FetchSessionCache) = this(time, Seq(cache))
+
+  def getShardedCache(sessionId: Int): FetchSessionCache = {

Review Comment:
   `getCacheShard`?



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int,
   // A map containing sessions which can be evicted by privileged sessions.
   private val evictableByPrivileged = new util.TreeMap[EvictableKey, 
FetchSession]
 
+  private val metricTag = Map("shard" -> s"$shardNum").asJava
+
   // Set up metrics.
-  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size)
-  
metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, 
() => FetchSessionCache.this.totalPartitions)
-  
m

[jira] [Created] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer

2024-05-03 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16665:
--

 Summary: Fail to get partition's position from within 
onPartitionsAssigned callback in new consumer 
 Key: KAFKA-16665
 URL: https://issues.apache.org/jira/browse/KAFKA-16665
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans
Assignee: Lianet Magrans
 Fix For: 3.8.0


If we attempt to call consumer.position(tp) from within the 
onPartitionsAssigned callback, the new consumer fails with a TimeoutException. 
The expectation is that we should be able to retrieve the position of newly 
assigned partitions, as it happens with the legacy consumer, that allows this 
call. This is actually used from places within Kafka itself (ex. Connect 
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
 

The failure in the new consumer is because the partitions that are assigned but 
awaiting the onPartitionsAssigned callback, are excluded from the list of 
partitions that should initialize. We should allow the partitions to initialize 
their positions, without allowing to fetch data from them (which is already 
achieve based on the isFetchable flag in the subscription state).

  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16659: KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER [kafka]

2024-05-03 Thread via GitHub


chia7712 commented on code in PR #15853:
URL: https://github.com/apache/kafka/pull/15853#discussion_r1589362449


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration 
timeout) {
 return position.offset;
 
 updateFetchPositions(timer);

Review Comment:
   We need to make sure `updateFetchPositions` also honour the `wakeup`. For 
example, the following test get failed with this PR.
   
   ```java
 @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
 @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
 @Timeout(5)
 def testPositionRespectsWakeup(quorum: String, groupProtocol: String): 
Unit = {
   val topicPartition = new TopicPartition(topic, 15)
   val properties = new Properties();
   properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:12345");
   val consumer = createConsumer(configOverrides = properties)
   consumer.assign(List(topicPartition).asJava)
   
   CompletableFuture.runAsync { () =>
 TimeUnit.SECONDS.sleep(1)
 consumer.wakeup()
   }
   
   assertThrows(classOf[WakeupException], () => 
consumer.position(topicPartition, Duration.ofSeconds(100)))
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16659: KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER [kafka]

2024-05-03 Thread via GitHub


chia7712 commented on code in PR #15853:
URL: https://github.com/apache/kafka/pull/15853#discussion_r1589364098


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration 
timeout) {
 return position.offset;
 
 updateFetchPositions(timer);

Review Comment:
   `properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:12345");` can cause the connection waiting, and that should be 
"wakeupable"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Remove `ConsumerGroupPartitionMetadataValue.Epoch` field [kafka]

2024-05-03 Thread via GitHub


dajac opened a new pull request, #15854:
URL: https://github.com/apache/kafka/pull/15854

   `ConsumerGroupPartitionMetadataValue.Epoch` is not used anywhere so we can 
remove it. Note that we already have non-backward compatible changes lined up 
for 3.8 so it is fine to do it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16511) Leaking tiered segments

2024-05-03 Thread Francois Visconte (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843253#comment-17843253
 ] 

Francois Visconte edited comment on KAFKA-16511 at 5/3/24 3:39 PM:
---

[~ckamal] / [~showuon]

I pulled the fix on one of my test cluster and I confirm it fixed the issue I 
had on some partitions. The old segments were cleaned up from remote storage 
too.


was (Author: JIRAUSER288982):
[~ckamal][~showuon]

I pulled the fix on one of my test cluster and I confirm it fixed the issue I 
had on some partitions. The old segments were cleaned up from remote storage 
too.

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 

[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-05-03 Thread Francois Visconte (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843253#comment-17843253
 ] 

Francois Visconte commented on KAFKA-16511:
---

[~ckamal][~showuon]

I pulled the fix on one of my test cluster and I confirm it fixed the issue I 
had on some partitions. The old segments were cleaned up from remote storage 
too.

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}
> , startOffset=2971163, endOffset=2978396, brokerId=10001, 
> maxTimestampMs=1712010648756, eventTimestampMs=

Re: [PR] KAFKA-16507 Add raw record into RecordDeserialisationException [kafka]

2024-05-03 Thread via GitHub


fvaleri commented on code in PR #15691:
URL: https://github.com/apache/kafka/pull/15691#discussion_r1589379241


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##
@@ -311,25 +311,35 @@  ConsumerRecord parseRecord(Deserializers deserializers,
 Optional leaderEpoch,
 TimestampType timestampType,
 Record record) {
+long offset = record.offset();
+long timestamp = record.timestamp();
+ByteBuffer keyBytes = record.key();
+ByteBuffer valueBytes = record.value();
+Headers headers = new RecordHeaders(record.headers());
+K key;
+V value;
 try {
-long offset = record.offset();
-long timestamp = record.timestamp();
-Headers headers = new RecordHeaders(record.headers());
-ByteBuffer keyBytes = record.key();
-K key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
-ByteBuffer valueBytes = record.value();
-V value = valueBytes == null ? null : 
deserializers.valueDeserializer.deserialize(partition.topic(), headers, 
valueBytes);
-return new ConsumerRecord<>(partition.topic(), 
partition.partition(), offset,
-timestamp, timestampType,
-keyBytes == null ? ConsumerRecord.NULL_SIZE : 
keyBytes.remaining(),
-valueBytes == null ? ConsumerRecord.NULL_SIZE : 
valueBytes.remaining(),
-key, value, headers, leaderEpoch);
+key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
 } catch (RuntimeException e) {
-log.error("Deserializers with error: {}", deserializers);
-throw new RecordDeserializationException(partition, 
record.offset(),
-"Error deserializing key/value for partition " + partition 
+
+throw new 
RecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY,

Review Comment:
   What about using a helper method like this to throw the exception?
   
   ```sh
   private void 
throwRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin
 origin,
TopicPartition 
partition,
TimestampType 
timestampType,
Record record,
RuntimeException e) {
   throw new RecordDeserializationException(origin, partition, 
record.offset(), record.timestamp(), timestampType, record.key(), 
   record.value(), new RecordHeaders(record.headers()), "Error 
deserializing " + origin.toString() + " for partition " 
   + partition + " at offset " + record.offset() + ". If 
needed, please seek past the record to continue consumption.", e);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15309: part 2-Add custom error handler for UnknownTopicOrPartitionException to Producer [kafka]

2024-05-03 Thread via GitHub


aliehsaeedii opened a new pull request, #15855:
URL: https://github.com/apache/kafka/pull/15855

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer

2024-05-03 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16665:
---
Description: 
If we attempt to call consumer.position(tp) from within the 
onPartitionsAssigned callback, the new consumer fails with a TimeoutException. 
The expectation is that we should be able to retrieve the position of newly 
assigned partitions, as it happens with the legacy consumer, that allows this 
call. This is actually used from places within Kafka itself (ex. Connect 
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
 

The failure in the new consumer is because the partitions that are assigned but 
awaiting the onPartitionsAssigned callback, are excluded from the list of 
partitions that should initialize. We should allow the partitions to initialize 
their positions, without allowing to fetch data from them (which is already 
achieve based on the isFetchable flag in the subscription state).

Note that a partition position can be updated from 2 places: call to 
consumer.position or call to consumer.poll. Both will attempt to 
`updateFetchPositions` if there is no valid position yet, but even after having 
a valid position after those calls, the partition will remain non-fetchable 
until the onPartitionsAssigned callback completes (fetchable considers that the 
partitions has a valid position AND is not awaiting the callback)

  

  was:
If we attempt to call consumer.position(tp) from within the 
onPartitionsAssigned callback, the new consumer fails with a TimeoutException. 
The expectation is that we should be able to retrieve the position of newly 
assigned partitions, as it happens with the legacy consumer, that allows this 
call. This is actually used from places within Kafka itself (ex. Connect 
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
 

The failure in the new consumer is because the partitions that are assigned but 
awaiting the onPartitionsAssigned callback, are excluded from the list of 
partitions that should initialize. We should allow the partitions to initialize 
their positions, without allowing to fetch data from them (which is already 
achieve based on the isFetchable flag in the subscription state).

  


> Fail to get partition's position from within onPartitionsAssigned callback in 
> new consumer 
> ---
>
> Key: KAFKA-16665
> URL: https://issues.apache.org/jira/browse/KAFKA-16665
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If we attempt to call consumer.position(tp) from within the 
> onPartitionsAssigned callback, the new consumer fails with a 
> TimeoutException. The expectation is that we should be able to retrieve the 
> position of newly assigned partitions, as it happens with the legacy 
> consumer, that allows this call. This is actually used from places within 
> Kafka itself (ex. Connect 
> [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
>  
> The failure in the new consumer is because the partitions that are assigned 
> but awaiting the onPartitionsAssigned callback, are excluded from the list of 
> partitions that should initialize. We should allow the partitions to 
> initialize their positions, without allowing to fetch data from them (which 
> is already achieve based on the isFetchable flag in the subscription state).
> Note that a partition position can be updated from 2 places: call to 
> consumer.position or call to consumer.poll. Both will attempt to 
> `updateFetchPositions` if there is no valid position yet, but even after 
> having a valid position after those calls, the partition will remain 
> non-fetchable until the onPartitionsAssigned callback completes (fetchable 
> considers that the partitions has a valid position AND is not awaiting the 
> callback)
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-03 Thread via GitHub


gaurav-narula commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589418655


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, 
id: Int) extends Compara
   *
   * @param maxEntries The maximum number of entries that can be in the cache.
   * @param evictionMs The minimum time that an entry must be unused in order 
to be evictable.
-  */
+  * @param sessionIdRange The number of sessionIds each cache shard handles. 
The range for a given shard is [Math.max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange).
+  * @param shardNum Identifier for this shard.
+ */
 class FetchSessionCache(private val maxEntries: Int,

Review Comment:
   Thanks for the suggestion! I've renamed the existing type to 
`FetchSessionCacheShard` and `FetchSessionCache` is now essentially a wrapper 
around `Seq[FetchSessionCacheShard]`. This conveys the intention clearly indeed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16665: Allow to initialize newly assigned partition's positions without allowing fetching while callback runs [kafka]

2024-05-03 Thread via GitHub


lianetm commented on PR #15856:
URL: https://github.com/apache/kafka/pull/15856#issuecomment-2093364129

   Hey @lucasbru , could you take a look? 
   
   For more context, we initially decided not to allow initializing positions 
while the onPartitionsAssigned callback was running, concerned about a user 
trying to seek in the callback, and us initializing to a different position 
within the poll, but that shouldn't happen because we execute the callbacks in 
the foreground thread, so poll will be blocked while the onPartitionsAssigned 
completes (so user's seek would happen, callback completes, then next poll). 
Actually, with the connect test we realized that we do need to allow to 
initialize positions, because a user may want to call .position() from within 
the callback, like the WorkerSinkTask does (that sys test passes locally now 
with this fix)
   
   Then, this [PR](https://github.com/apache/kafka/pull/15724) was after a 
symptom of this same problem, but I would say we put the fix in the wrong place 
by not allowing reset, when in reality the issue was that we were not allowing 
to initialize the position in the first place. This is why with this current PR 
I'm kind of reverting that other PR comments, and part of the logic (not all , 
mainly to keep some refactoring it had).
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Add remove operation with future to state updater [kafka]

2024-05-03 Thread via GitHub


cadonna commented on code in PR #15852:
URL: https://github.com/apache/kafka/pull/15852#discussion_r1589452435


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -496,6 +504,103 @@ private void addTask(final Task task) {
 }
 }
 
+private void removeTask(final TaskId taskId, final 
CompletableFuture future) {
+try {
+if (updatingTasks.containsKey(taskId)) {
+removeUpdatingTask(taskId, future);

Review Comment:
   Done! Code looks better now. Thanks a lot!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1589476625


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10822,1146 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testJoiningConsumerGroupWithNewDynamicMember() throws 
Exception {
+String groupId = "group-id";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 2)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withSubscriptionMetadata(new HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+}
+})
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1)))
+   

Re: [PR] KAFKA-16655: Deflaking ZKMigrationIntegrationTest.testDualWrite [kafka]

2024-05-03 Thread via GitHub


ahuang98 commented on PR #15845:
URL: https://github.com/apache/kafka/pull/15845#issuecomment-2093452127

   test failures look unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16655: Deflaking ZKMigrationIntegrationTest.testDualWrite [kafka]

2024-05-03 Thread via GitHub


cmccabe merged PR #15845:
URL: https://github.com/apache/kafka/pull/15845


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-03 Thread via GitHub


mumrah commented on PR #15744:
URL: https://github.com/apache/kafka/pull/15744#issuecomment-2093513532

   So many conflicts 😭 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-05-03 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reassigned KAFKA-16474:
--

Assignee: Philip Nee  (was: Lianet Magrans)

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: failing_results.zip
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14995: Automate asf.yaml collaborators refresh [kafka]

2024-05-03 Thread via GitHub


stevenbooke closed pull request #13842: KAFKA-14995: Automate asf.yaml 
collaborators refresh
URL: https://github.com/apache/kafka/pull/13842


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15835:
URL: https://github.com/apache/kafka/pull/15835#discussion_r1589614396


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -208,8 +209,8 @@ public void recordEventQueueTime(long durationMs) { }
 public void recordEventQueueProcessingTime(long durationMs) { }
 
 @Override
-public void recordThreadIdleRatio(double ratio) {
-threadIdleRatioSensor.record(ratio);
+public synchronized void recordThreadIdleTime(long idleTimeMs, long 
currentTimeMs) {
+threadIdleTimeRate.record(metrics.config(), idleTimeMs, currentTimeMs);

Review Comment:
   Hum… Can’t we really use a Rate in a Sensor? This seems strange to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-05-03 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843318#comment-17843318
 ] 

Lianet Magrans commented on KAFKA-16474:


Hey [~pnee], this is the PR https://github.com/apache/kafka/pull/15723 from 
[~kirktrue] that fixes a bug that for sure was leading to double heartbeats (I 
guess it's what you were seeing here, to double check)

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: failing_results.zip
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-05-03 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843320#comment-17843320
 ] 

Philip Nee commented on KAFKA-16474:


Hey [~lianetm]  - i'm planning to verify the fix and close out the issue.

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: failing_results.zip
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership [kafka]

2024-05-03 Thread via GitHub


Alexander-Aghili commented on PR #15055:
URL: https://github.com/apache/kafka/pull/15055#issuecomment-2093622997

   Ok will do. Ill link it here when done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-05-03 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843324#comment-17843324
 ] 

Lianet Magrans commented on KAFKA-16474:


Yes, makes sense, that PR is addressing only point 1. of what you reported, so 
we still have the point about the poll frequency (maybe to address separately 
if we still believe it's a concern) 

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: failing_results.zip
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-05-03 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843326#comment-17843326
 ] 

Philip Nee commented on KAFKA-16474:


Yeah I think we still need to figure why network thread isn't backing off for 
most of the polls.

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: failing_results.zip
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-03 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1589652714


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,381 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private final Iterable groupProtocols;
 
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+this.groupProtocols = cluster.config().serverPropertie

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-03 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1589657729


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,381 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private final Iterable groupProtocols;
 
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+this.groupProtocols = cluster.config().serverPropertie

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-03 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1589652714


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,381 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private final Iterable groupProtocols;
 
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+this.groupProtocols = cluster.config().serverPropertie

Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-03 Thread via GitHub


hachikuji commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1589667403


##
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java:
##
@@ -216,4 +140,132 @@ private void appendBatches(List> 
batches) {
 batches.forEach(CompletedBatch::release);
 }
 }
+
+final public static class Builder {
+private long lastContainedLogTimestamp = 0;
+private CompressionType compressionType = CompressionType.NONE;
+private Time time = Time.SYSTEM;
+private int maxBatchSize = 1024;
+private MemoryPool memoryPool = MemoryPool.NONE;
+private short kraftVersion = 0;

Review Comment:
   Why would we want most tests to use an older version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >