[jira] [Commented] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh

2024-02-26 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820943#comment-17820943
 ] 

Luke Chen commented on KAFKA-16282:
---

[~ahmedsobeh] , thanks for the drafted KIP. I think you can refer to 
[KIP-1005|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1005%3A+Expose+EarliestLocalOffset+and+TieredOffset]
 for how to write a similar KIP. Other than that, you can directly start a 
discuss thread in dev mailing list as Justine suggested. 

> Allow to get last stable offset (LSO) in kafka-get-offsets.sh
> -
>
> Key: KAFKA-16282
> URL: https://issues.apache.org/jira/browse/KAFKA-16282
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Ahmed Sobeh
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> Currently, when using `kafka-get-offsets.sh` to get the offset by time, we 
> have these choices:
> {code:java}
> --time  /  timestamp of the offsets before 
> that. 
>   -1 or latest /   [Note: No offset is returned, if 
> the
>   -2 or earliest / timestamp greater than recently
>   -3 or max-timestamp /committed record timestamp is
>   -4 or earliest-local /   given.] (default: latest)
>   -5 or latest-tiered  
> {code}
> For the "latest" option, it'll always return the "high watermark" because we 
> always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
> would be good if the command can support to get the last stable offset (LSO) 
> for transaction support. That is, sending the option with 
> *IsolationLevel.READ_COMMITTED*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15215: migrate StreamedJoinTest to Mockito [kafka]

2024-02-26 Thread via GitHub


ableegoldman merged PR #15424:
URL: https://github.com/apache/kafka/pull/15424


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: migrate StreamedJoinTest to Mockito [kafka]

2024-02-26 Thread via GitHub


ableegoldman commented on PR #15424:
URL: https://github.com/apache/kafka/pull/15424#issuecomment-1965693809

   Looks like a new build was triggered when I renamed the PR but I have the 
results from before that and it looked good. Test failures all unrelated. Will 
merge this to trunk
   
   Build from before the name change: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15424/2/tests/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: migrate StreamedJoinTest to Mockito [kafka]

2024-02-26 Thread via GitHub


ableegoldman commented on PR #15424:
URL: https://github.com/apache/kafka/pull/15424#issuecomment-1965688725

   remember to name your PRs correctly -- I added the `KAFKA-15215` prefix 
since this came about from that ticket, but you can always use `MINOR:` for 
small fixes that don't have a corresponding ticket


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: migrate StreamedJoinTest to Mockito [kafka]

2024-02-26 Thread via GitHub


agavra commented on code in PR #15424:
URL: https://github.com/apache/kafka/pull/15424#discussion_r1503563459


##
build.gradle:
##
@@ -2333,6 +2333,8 @@ project(':streams:streams-scala') {
 
 testImplementation libs.junitJupiter
 testImplementation libs.easymock
+testImplementation libs.mockitoCore

Review Comment:
   I checked. looks like no tests in the streams scala tests used mockito yet 路 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: migrate StreamedJoinTest to Mockito [kafka]

2024-02-26 Thread via GitHub


ableegoldman commented on code in PR #15424:
URL: https://github.com/apache/kafka/pull/15424#discussion_r1503562979


##
build.gradle:
##
@@ -2333,6 +2333,8 @@ project(':streams:streams-scala') {
 
 testImplementation libs.junitJupiter
 testImplementation libs.easymock
+testImplementation libs.mockitoCore

Review Comment:
   Wait...why do we need to add a dependency for this? I thought the whole 
point of the change was to align this test with others in the same file -- 
shouldn't that mean whatever dependencies are needed are already there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16102) about DynamicListenerConfig, the dynamic modification of the listener's port or IP does not take effect.

2024-02-26 Thread Jialun Peng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jialun Peng updated KAFKA-16102:

Description: 
When I dynamically modify the parameters related to Kafka listeners, such as 
changing the IP or port value of a listener, the dynamic parameters under the 
corresponding path in ZooKeeper are updated. However, in reality, the 
modification of the IP or port for the corresponding listener does not take 
effect. This phenomenon consistently occurs. And there is a slight improvement 
as the error "Security protocol cannot be updated for existing listener" will 
be eliminated.

 

  was:When I dynamically modify the parameters related to Kafka listeners, such 
as changing the IP or port value of a listener, the dynamic parameters under 
the corresponding path in ZooKeeper are updated. However, in reality, the 
modification of the IP or port for the corresponding listener does not take 
effect. This phenomenon consistently occurs. And there is a slight improvement 
as the error "Security protocol cannot be updated for existing listener" will 
be eliminated.


> about DynamicListenerConfig, the dynamic modification of the listener's port 
> or IP does not take effect.
> 
>
> Key: KAFKA-16102
> URL: https://issues.apache.org/jira/browse/KAFKA-16102
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.6.0
> Environment: Must be present in any environment.
>Reporter: Jialun Peng
>Assignee: Jialun Peng
>Priority: Minor
>  Labels: easyfix
> Fix For: 3.8.0
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> When I dynamically modify the parameters related to Kafka listeners, such as 
> changing the IP or port value of a listener, the dynamic parameters under the 
> corresponding path in ZooKeeper are updated. However, in reality, the 
> modification of the IP or port for the corresponding listener does not take 
> effect. This phenomenon consistently occurs. And there is a slight 
> improvement as the error "Security protocol cannot be updated for existing 
> listener" will be eliminated.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-26 Thread via GitHub


showuon commented on PR #15213:
URL: https://github.com/apache/kafka/pull/15213#issuecomment-1965678417

   @clolov , do we have any update about the compilation error fix? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]

2024-02-26 Thread via GitHub


hachikuji merged PR #15385:
URL: https://github.com/apache/kafka/pull/15385


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16305: Avoid optimisation in handshakeUnwrap [kafka]

2024-02-26 Thread via GitHub


gaurav-narula commented on code in PR #15434:
URL: https://github.com/apache/kafka/pull/15434#discussion_r1503432226


##
clients/src/test/java/org/apache/kafka/common/security/ssl/NettySslEngineFactory.java:
##
@@ -0,0 +1,159 @@
+package org.apache.kafka.common.security.ssl;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManagerFactory;
+import java.security.KeyStore;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class NettySslEngineFactory extends DefaultSslEngineFactory {

Review Comment:
    Reverted that commit and added the unit test in 79902f5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-02-26 Thread via GitHub


jeffkbkim commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1503404075


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationConfig.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupProtocolMigrationConfig {

Review Comment:
   it doesn't seem like other configs in KafkaConfigs are referred to as their 
own "Config" and the name made me think it would have several configs. How's 
`GroupProtocolMigrationPolicy` and `group.protocol.migration.policy`? Or we can 
stick with `group.protocol.migration` and go with `GroupProtocolMigration`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs [kafka]

2024-02-26 Thread via GitHub


jolshan commented on code in PR #15432:
URL: https://github.com/apache/kafka/pull/15432#discussion_r1503408163


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1731,15 +1735,19 @@ public void onNewMetadataImage(
 // Push an event for each coordinator.
 coordinators.keySet().forEach(tp -> {
 scheduleInternalOperation("UpdateImage(tp=" + tp + ", offset=" + 
newImage.offset() + ")", tp, () -> {
-withContextOrThrow(tp, context -> {
-if (context.state == CoordinatorState.ACTIVE) {
+CoordinatorContext context = coordinators.get(tp);
+if (context != null && context.state == 
CoordinatorState.ACTIVE) {

Review Comment:
   I noticed there are still some places we don't want the context to be null 
-- ie loading the partition. (And for all the run methods we want the 
coordinator to be active) Is the general rationale for this change that not 
having the conetext/active state is ok if we expect the coordinator moved? Is 
it possible we could try to load and the coordinator moves again? Just trying 
to understand the cases we accept null or non-active coordinators vs when we 
throw errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Add 3.7.0 to system tests [kafka]

2024-02-26 Thread via GitHub


stanislavkozlovski opened a new pull request, #15436:
URL: https://github.com/apache/kafka/pull/15436

   As per the [release 
instructions](https://cwiki.apache.org/confluence/display/KAFKA/Release+Process),
 bumping the versions in the associated files here as part of the 3.7 release


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


jolshan commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1503400099


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java:
##
@@ -137,31 +136,43 @@ public void add(T event) throws 
RejectedExecutionException {
 }
 
 /**
- * Returns the next {{@link Event}} available. This method block 
indefinitely until
- * one event is ready or the accumulator is closed.
+ * Returns the next {{@link Event}} available or null if no event is
+ * available.
  *
- * @return The next event.
+ * @return The next event available or null.
  */
 public T poll() {
-return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
+lock.lock();
+try {
+K key = randomKey();
+if (key == null) return null;
+
+Queue queue = queues.get(key);
+T event = queue.poll();
+
+if (queue.isEmpty()) queues.remove(key);
+inflightKeys.add(key);
+size--;
+
+return event;
+} finally {
+lock.unlock();
+}
 }
 
 /**
- * Returns the next {{@link Event}} available. This method blocks for the 
provided
- * time and returns null of not event is available.
+ * Returns the next {{@link Event}} available. This method blocks until an
+ * event is available or the thread is interrupted.
  *
- * @param timeout   The timeout.
- * @param unit  The timeout unit.
  * @return The next event available or null.
  */
-public T poll(long timeout, TimeUnit unit) {
+public T take() {
 lock.lock();
 try {
 K key = randomKey();
-long nanos = unit.toNanos(timeout);
-while (key == null && !closed && nanos > 0) {
+while (key == null && !closed) {
 try {
-nanos = condition.awaitNanos(nanos);

Review Comment:
   I think the idea was to simplify it. Since nanos was always 0 or max value, 
there's no need to specify specific values. Just have two methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


jeffkbkim commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1503398332


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java:
##
@@ -137,31 +136,43 @@ public void add(T event) throws 
RejectedExecutionException {
 }
 
 /**
- * Returns the next {{@link Event}} available. This method block 
indefinitely until
- * one event is ready or the accumulator is closed.
+ * Returns the next {{@link Event}} available or null if no event is
+ * available.
  *
- * @return The next event.
+ * @return The next event available or null.
  */
 public T poll() {
-return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
+lock.lock();
+try {
+K key = randomKey();
+if (key == null) return null;
+
+Queue queue = queues.get(key);
+T event = queue.poll();
+
+if (queue.isEmpty()) queues.remove(key);
+inflightKeys.add(key);
+size--;
+
+return event;
+} finally {
+lock.unlock();
+}
 }
 
 /**
- * Returns the next {{@link Event}} available. This method blocks for the 
provided
- * time and returns null of not event is available.
+ * Returns the next {{@link Event}} available. This method blocks until an
+ * event is available or the thread is interrupted.
  *
- * @param timeout   The timeout.
- * @param unit  The timeout unit.
  * @return The next event available or null.
  */
-public T poll(long timeout, TimeUnit unit) {
+public T take() {
 lock.lock();
 try {
 K key = randomKey();
-long nanos = unit.toNanos(timeout);
-while (key == null && !closed && nanos > 0) {
+while (key == null && !closed) {
 try {
-nanos = condition.awaitNanos(nanos);

Review Comment:
   it seems to me that the purpose of this PR is to remove this. how much worse 
is using awaitNanos compared to await? i can imagine a subtle impact but i 
guess i'd like to know your expectation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


jeffkbkim commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1503396742


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -53,53 +50,19 @@
 
 @Timeout(value = 60)
 public class MultiThreadedEventProcessorTest {
-private static class MockEventAccumulator extends 
EventAccumulator {
+private static class DelayEventAccumulator extends 
EventAccumulator {
 private final Time time;
-private final Queue events;
-private final long timeToPollMs;
-private final AtomicBoolean isClosed;
+private final long takeDelayMs;
 
-public MockEventAccumulator(Time time, long timeToPollMs) {
+public DelayEventAccumulator(Time time, long takeDelayMs) {
 this.time = time;
-this.events = new LinkedList<>();
-this.timeToPollMs = timeToPollMs;
-this.isClosed = new AtomicBoolean(false);
+this.takeDelayMs = takeDelayMs;
 }
 
 @Override
-public CoordinatorEvent poll() {
-synchronized (events) {
-while (events.isEmpty() && !isClosed.get()) {
-try {
-events.wait();
-} catch (Exception ignored) {
-
-}
-}
-time.sleep(timeToPollMs);
-return events.poll();
-}
-}
-
-@Override
-public CoordinatorEvent poll(long timeout, TimeUnit unit) {
-return null;
-}
-
-@Override
-public void add(CoordinatorEvent event) throws 
RejectedExecutionException {
-synchronized (events) {
-events.add(event);
-events.notifyAll();
-}
-}
-
-@Override
-public void close() {
-isClosed.set(true);
-synchronized (events) {
-events.notifyAll();
-}
+public CoordinatorEvent take() {

Review Comment:
   thanks for the simplification!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820880#comment-17820880
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16277:


FYI I added you as a contributor so you should be able to self-assign tickets 
from now on. Thanks again for contributing a fix!

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Assignee: Cameron Redpath
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-26 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-16277.

Resolution: Fixed

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-26 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16277:
---
Fix Version/s: 3.8.0

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-26 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-16277:
--

Assignee: Cameron Redpath

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Assignee: Cameron Redpath
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16277: AbstractStickyAssignor - Sort owned TopicPartitions by partition when reassigning [kafka]

2024-02-26 Thread via GitHub


ableegoldman commented on PR #15416:
URL: https://github.com/apache/kafka/pull/15416#issuecomment-1965331000

   Test failures are unrelated, merged to trunk.
   
   Thanks for the fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16277: AbstractStickyAssignor - Sort owned TopicPartitions by partition when reassigning [kafka]

2024-02-26 Thread via GitHub


ableegoldman merged PR #15416:
URL: https://github.com/apache/kafka/pull/15416


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16277: AbstractStickyAssignor - Sort owned TopicPartitions by partition when reassigning [kafka]

2024-02-26 Thread via GitHub


ableegoldman commented on code in PR #15416:
URL: https://github.com/apache/kafka/pull/15416#discussion_r1503326278


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -663,6 +663,7 @@ private void assignOwnedPartitions() {
 String consumer = consumerEntry.getKey();

Review Comment:
   fair enough. thanks for taking a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: remove test constructor for PartitionAssignment [kafka]

2024-02-26 Thread via GitHub


cmccabe opened a new pull request, #15435:
URL: https://github.com/apache/kafka/pull/15435

   Remove the test constructor for PartitionAssignment and remove the TODO.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16308) Formatting and Updating Kafka Features

2024-02-26 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-16308:
--

 Summary: Formatting and Updating Kafka Features
 Key: KAFKA-16308
 URL: https://issues.apache.org/jira/browse/KAFKA-16308
 Project: Kafka
  Issue Type: Task
Reporter: Justine Olshan
Assignee: Justine Olshan


As part of KIP-1022, we need to extend the storage and upgrade tools to support 
features other than metadata version. 

See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Formatting+and+Updating+Features



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


jolshan commented on PR #15430:
URL: https://github.com/apache/kafka/pull/15430#issuecomment-1965265194

   Left one question -- otherwise lgtm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


jolshan commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1503291909


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -53,53 +50,19 @@
 
 @Timeout(value = 60)
 public class MultiThreadedEventProcessorTest {
-private static class MockEventAccumulator extends 
EventAccumulator {
+private static class DelayEventAccumulator extends 
EventAccumulator {
 private final Time time;
-private final Queue events;
-private final long timeToPollMs;
-private final AtomicBoolean isClosed;
+private final long takeDelayMs;
 
-public MockEventAccumulator(Time time, long timeToPollMs) {
+public DelayEventAccumulator(Time time, long takeDelayMs) {
 this.time = time;
-this.events = new LinkedList<>();
-this.timeToPollMs = timeToPollMs;
-this.isClosed = new AtomicBoolean(false);
+this.takeDelayMs = takeDelayMs;
 }
 
 @Override
-public CoordinatorEvent poll() {
-synchronized (events) {

Review Comment:
   It's interesting that we basically made the real accumulator do what this 
mock was doing  
   But it also makes sense  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-26 Thread via GitHub


kirktrue commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1503266038


##
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##
@@ -79,9 +95,24 @@ public List isr() {
 return isr;
 }
 
+/**
+ * Return the eligible leader replicas of the partition. Note that the 
ordering of the result is unspecified.
+ */
+public List elr() {
+return elr;
+}
+
+/**
+ * Return the last known eligible leader replicas of the partition. Note 
that the ordering of the result is unspecified.
+ */
+public List lastKnownElr() {

Review Comment:
   And likewise here:
   
   ```suggestion
   public List lastKnownEligibleLeaderReplicas() {
   ```



##
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##
@@ -79,9 +95,24 @@ public List isr() {
 return isr;
 }
 
+/**
+ * Return the eligible leader replicas of the partition. Note that the 
ordering of the result is unspecified.
+ */
+public List elr() {

Review Comment:
   Would you consider using the full name for the uninitiated?
   
   ```suggestion
   public List eligibleLeaderReplicas() {
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15695) Local log start offset is not updated on the follower after rebuilding remote log auxiliary state

2024-02-26 Thread Henry Cai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820852#comment-17820852
 ] 

Henry Cai commented on KAFKA-15695:
---

Can this fix be back ported into 3.6.X branch?

> Local log start offset is not updated on the follower after rebuilding remote 
> log auxiliary state
> -
>
> Key: KAFKA-15695
> URL: https://issues.apache.org/jira/browse/KAFKA-15695
> Project: Kafka
>  Issue Type: Bug
>  Components: replication, Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Nikhil Ramakrishnan
>Assignee: Nikhil Ramakrishnan
>Priority: Major
>  Labels: KIP-405, tiered-storage
> Fix For: 3.7.0
>
>
> In 3.6, the local log start offset is not updated when reconstructing the 
> auxiliary state of the remote log on a follower.
> The impact of this bug is significant because, if this follower becomes the 
> leader before the local log start offset has had a chance to be updated, 
> reads from any offset between [wrong log start offset; actual log start 
> offset] will be routed on the local storage, which does not contain the 
> corresponding data. Consumer reads will in this case never be satisfied.
>  
> Reproduction case 1:
>  # Create cluster with 2 brokers, broker 0 and broker 1.
>  # Create a topic topicA with RF=1, 1 partition (topicA-0) on broker 0, and 1 
> batch per segment.
>  # Produce 3 records to topicA, such that segment 1 and segment 2 with the 
> first two records are copied to remote and deleted from local storage.
>  # Reassign replica to add broker 1 to the replica set for topicA-0, and 
> elect broker 1 as the leader.
>  # Try to consume from the beginning of topicA-0.
>  
> Reproduction case 2:
>  # Create a cluster with 2 brokers, broker 0 and broker 1.
>  # Create a topic topicA with RF=2, 1 partition (topicA-0) and 2 batches per 
> segment, with broker 0 as the leader.
>  # Stop broker 1, and produce 3 records to topicA, such that segment 1 with 
> the first two records are copied to remote and deleted from local storage.
>  # Start broker 1, let it catch up with broker 0.
>  # Stop broker 0 such that broker 1 is elected as the leader, and try to 
> consume from the beginning of topicA-0.
> Consumer read will not be satisfied in these cases because the local log 
> start offset is not updated on broker 1 when it builds the auxiliary state of 
> the remote log segments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-26 Thread via GitHub


kirktrue commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1503175975


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,25 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+public DescribeTopicsOptions useDescribeTopicsApi(boolean 
useDescribeTopicsApi) {
+this.useDescribeTopicsApi = useDescribeTopicsApi;
+return this;
+}

Review Comment:
   Can we add some comments here for a developer to know _why_ to use the 
topics API or not?



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -994,6 +1002,36 @@ public boolean isInternal() {
 }
 }
 
+abstract class RecurringCall {
+private final String name;
+final long deadlineMs;
+private final AdminClientRunnable runnable;
+KafkaFutureImpl nextRun;
+abstract Call generateCall();
+
+public RecurringCall(String name, long deadlineMs, AdminClientRunnable 
runnable) {
+this.name = name;
+this.deadlineMs = deadlineMs;
+this.runnable = runnable;
+}
+
+public String toString() {
+return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + 
")";

Review Comment:
   ```suggestion
   return "RecurringCall(name=" + name + ", deadlineMs=" + 
deadlineMs + ")";
   ```



##
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##
@@ -79,9 +95,24 @@ public List isr() {
 return isr;
 }
 
+/**
+ * Return the eligible leader replicas of the partition. Note that the 
ordering of the result is unspecified.
+ */
+public List elr() {
+return elr;
+}
+
+/**
+ * Return the last known eligible leader replicas of the partition. Note 
that the ordering of the result is unspecified.
+ */
+public List lastKnownElr() {
+return lastKnownElr;
+}
+
 public String toString() {
 return "(partition=" + partition + ", leader=" + leader + ", 
replicas=" +
-Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + 
")";
+Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") +
+Utils.join(elr, ", ") + Utils.join(lastKnownElr, ", ") + ")";

Review Comment:
   ```suggestion
   Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + 
", elr=" +
   Utils.join(elr, ", ") + ", lastKnownElr=" + 
Utils.join(lastKnownElr, ", ") + ")";
   ```



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -994,6 +1002,36 @@ public boolean isInternal() {
 }
 }
 
+abstract class RecurringCall {
+private final String name;
+final long deadlineMs;
+private final AdminClientRunnable runnable;
+KafkaFutureImpl nextRun;
+abstract Call generateCall();
+
+public RecurringCall(String name, long deadlineMs, AdminClientRunnable 
runnable) {
+this.name = name;
+this.deadlineMs = deadlineMs;
+this.runnable = runnable;
+}
+
+public String toString() {
+return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + 
")";
+}
+
+public void run() {
+try {
+do {
+nextRun = new KafkaFutureImpl<>();
+Call call = generateCall();
+runnable.call(call, time.milliseconds());
+} while (nextRun.get());
+} catch (Exception e) {
+log.info("Stop the recurring call " + name + " because " + e);

Review Comment:
   Are we specifically wanting to avoid outputting a stack trace?



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2108,9 +2146,12 @@ void handleFailure(Throwable throwable) {
 public DescribeTopicsResult describeTopics(final TopicCollection topics, 
DescribeTopicsOptions options) {
 if (topics instanceof TopicIdCollection)
 return 
DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) 
topics).topicIds(), options));
-else if (topics instanceof TopicNameCollection)
+else if (topics instanceof TopicNameCollection) {
+if (options.useDescribeTopicsApi()) {
+return DescribeTopicsResult.ofTopicNameIterator(new 
DescribeTopicPartitionsIterator(((TopicNameCollection) topics).topicNames(), 
options));

Review Comment:
   It's been my experience that it's "dangerous"  to run arbitrary user code 
from within the context of the client code. User code can (and will) do 
unpredictable things with state, errors, threads, etc. The surrounding code 
inside the client has to be very careful to make sure it 

[jira] [Commented] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh

2024-02-26 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820821#comment-17820821
 ] 

Justine Olshan commented on KAFKA-16282:


Thanks [~ahmedsobeh] 

A few things:
For the Admin.java proposed change, picking one of the options and listing the 
other as a rejected alternative. We may go with one or the other, but it is 
good to take an opinion for folks to agree or disagree with. 

For the compatibility section, we could have compatibility issues if someone 
uses this new field on a broker that doesn't support it. We should probably 
throw and error in that case. 

Generally though, I think the KIP is ready for discussion, I may have further 
comments for you on the mailing list :) 

> Allow to get last stable offset (LSO) in kafka-get-offsets.sh
> -
>
> Key: KAFKA-16282
> URL: https://issues.apache.org/jira/browse/KAFKA-16282
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Ahmed Sobeh
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> Currently, when using `kafka-get-offsets.sh` to get the offset by time, we 
> have these choices:
> {code:java}
> --time  /  timestamp of the offsets before 
> that. 
>   -1 or latest /   [Note: No offset is returned, if 
> the
>   -2 or earliest / timestamp greater than recently
>   -3 or max-timestamp /committed record timestamp is
>   -4 or earliest-local /   given.] (default: latest)
>   -5 or latest-tiered  
> {code}
> For the "latest" option, it'll always return the "high watermark" because we 
> always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
> would be good if the command can support to get the last stable offset (LSO) 
> for transaction support. That is, sending the option with 
> *IsolationLevel.READ_COMMITTED*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16305: Avoid optimisation in handshakeUnwrap [kafka]

2024-02-26 Thread via GitHub


ijuma commented on code in PR #15434:
URL: https://github.com/apache/kafka/pull/15434#discussion_r1503149170


##
clients/src/test/java/org/apache/kafka/common/security/ssl/NettySslEngineFactory.java:
##
@@ -0,0 +1,159 @@
+package org.apache.kafka.common.security.ssl;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManagerFactory;
+import java.security.KeyStore;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class NettySslEngineFactory extends DefaultSslEngineFactory {

Review Comment:
   I would expect a unit test versus a test that adds a third party dependency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9062) Handle stalled writes to RocksDB

2024-02-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820819#comment-17820819
 ] 

Matthias J. Sax commented on KAFKA-9062:


Given that bulk loading was disabled, should we close this ticket?

> Handle stalled writes to RocksDB
> 
>
> Key: KAFKA-9062
> URL: https://issues.apache.org/jira/browse/KAFKA-9062
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: new-streams-runtime-should-fix
>
> RocksDB may stall writes at times when background compactions or flushes are 
> having trouble keeping up. This means we can effectively end up blocking 
> indefinitely during a StateStore#put call within Streams, and may get kicked 
> from the group if the throttling does not ease up within the max poll 
> interval.
> Example: when restoring large amounts of state from scratch, we use the 
> strategy recommended by RocksDB of turning off automatic compactions and 
> dumping everything into L0. We do batch somewhat, but do not sort these small 
> batches before loading into the db, so we end up with a large number of 
> unsorted L0 files.
> When restoration is complete and we toggle the db back to normal (not bulk 
> loading) settings, a background compaction is triggered to merge all these 
> into the next level. This background compaction can take a long time to merge 
> unsorted keys, especially when the amount of data is quite large.
> Any new writes while the number of L0 files exceeds the max will be stalled 
> until the compaction can finish, and processing after restoring from scratch 
> can block beyond the polling interval



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14919) MM2 ForwardingAdmin tests should not conflate admin operations

2024-02-26 Thread Anton Liauchuk (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Liauchuk reassigned KAFKA-14919:
--

Assignee: Anton Liauchuk

> MM2 ForwardingAdmin tests should not conflate admin operations
> --
>
> Key: KAFKA-14919
> URL: https://issues.apache.org/jira/browse/KAFKA-14919
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Reporter: Greg Harris
>Assignee: Anton Liauchuk
>Priority: Minor
>  Labels: newbie
>
> The MirrorConnectorsWithCustomForwardingAdminIntegrationTest uses a special 
> implementation of ForwardingAdmin which records admin operations in a static 
> ConcurrentMap, which is then used to perform assertions.
> This has the problem that one variable (allTopics) is used to perform 
> assertions for multiple different methods (adding topics, adding partitions, 
> and syncing configs), despite these operations each being tested separately. 
> This leads to the confusing behavior where each test appears to assert that a 
> particular operation has taken place, and instead asserts that at least one 
> of the operations has taken place. This allows a regression or timeout in one 
> operation to be hidden by the others, making the behavior of the tests much 
> less predictable.
> These tests and/or the metadata store should be changed so that the tests are 
> isolated from one another, and actually perform the assertions that 
> correspond to their titles.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest

2024-02-26 Thread Chaitanya Mukka (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820801#comment-17820801
 ] 

Chaitanya Mukka commented on KAFKA-16223:
-

Ah sure. Let me do that. Currently, the tests have been pretty modular but 
maybe creating a new class will help us track progress as well.

> Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
> ---
>
> Key: KAFKA-16223
> URL: https://issues.apache.org/jira/browse/KAFKA-16223
> Project: Kafka
>  Issue Type: Sub-task
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


jolshan commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1503073175


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java:
##
@@ -137,31 +136,43 @@ public void add(T event) throws 
RejectedExecutionException {
 }
 
 /**
- * Returns the next {{@link Event}} available. This method block 
indefinitely until
- * one event is ready or the accumulator is closed.
+ * Returns the next {{@link Event}} available or null if no event is
+ * available.
  *
- * @return The next event.
+ * @return The next event available or null.
  */
 public T poll() {
-return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
+lock.lock();
+try {
+K key = randomKey();
+if (key == null) return null;
+
+Queue queue = queues.get(key);
+T event = queue.poll();
+
+if (queue.isEmpty()) queues.remove(key);
+inflightKeys.add(key);
+size--;
+
+return event;
+} finally {
+lock.unlock();
+}
 }
 
 /**
- * Returns the next {{@link Event}} available. This method blocks for the 
provided
- * time and returns null of not event is available.
+ * Returns the next {{@link Event}} available. This method blocks until an
+ * event is available or the thread is interrupted.

Review Comment:
   I noticed we just catch interrupted exceptions. Is that correct? Or should 
we adjust the comment to say or the accumulator is closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] migrate StreamedJoinTest to Mockito [kafka]

2024-02-26 Thread via GitHub


agavra commented on code in PR #15424:
URL: https://github.com/apache/kafka/pull/15424#discussion_r1503025048


##
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/StreamJoinedTest.scala:
##
@@ -21,24 +21,21 @@ import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
 import org.apache.kafka.streams.scala.serialization.Serdes
 import org.apache.kafka.streams.scala.serialization.Serdes._
 import org.apache.kafka.streams.state.Stores
-import org.easymock.EasyMock
-import org.easymock.EasyMock.{createMock, replay}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{BeforeEach, Test}
+import org.mockito.Mockito
 
 import java.time.Duration
 
 class StreamJoinedTest {

Review Comment:
   the equivalent with JUnit5 is:
   ```
   @ExtendWith(Array(classOf[MockitoExtension]))
   @MockitoSettings(strictness = Strictness.STRICT_STUBS)
   ```
   I've added that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16307) fix EventAccumulator thread idle ratio metric

2024-02-26 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-16307:
---

Assignee: Jeff Kim

> fix EventAccumulator thread idle ratio metric
> -
>
> Key: KAFKA-16307
> URL: https://issues.apache.org/jira/browse/KAFKA-16307
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
>
> The metric does not seem to be accurate, nor reporting metrics at every 
> interval. Requires investigation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16307) fix EventAccumulator thread idle ratio metric

2024-02-26 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-16307:
-
Description: The metric does not seem to be accurate, nor reporting metrics 
at every interval. Requires investigation  (was: The metric does not seem to be 
accurate. Requires investigation)

> fix EventAccumulator thread idle ratio metric
> -
>
> Key: KAFKA-16307
> URL: https://issues.apache.org/jira/browse/KAFKA-16307
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Priority: Major
>
> The metric does not seem to be accurate, nor reporting metrics at every 
> interval. Requires investigation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16307) fix EventAccumulator thread idle ratio metric

2024-02-26 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-16307:
-
Summary: fix EventAccumulator thread idle ratio metric  (was: investigate 
EventAccumulator thread idle ratio metric)

> fix EventAccumulator thread idle ratio metric
> -
>
> Key: KAFKA-16307
> URL: https://issues.apache.org/jira/browse/KAFKA-16307
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Priority: Major
>
> The metric does not seem to 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16307) investigate EventAccumulator thread idle ratio metric

2024-02-26 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-16307:
-
Description: The metric does not seem to 

> investigate EventAccumulator thread idle ratio metric
> -
>
> Key: KAFKA-16307
> URL: https://issues.apache.org/jira/browse/KAFKA-16307
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Priority: Major
>
> The metric does not seem to 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16307) investigate EventAccumulator thread idle ratio metric

2024-02-26 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-16307:


 Summary: investigate EventAccumulator thread idle ratio metric
 Key: KAFKA-16307
 URL: https://issues.apache.org/jira/browse/KAFKA-16307
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16307) fix EventAccumulator thread idle ratio metric

2024-02-26 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-16307:
-
Description: The metric does not seem to be accurate. Requires 
investigation  (was: The metric does not seem to )

> fix EventAccumulator thread idle ratio metric
> -
>
> Key: KAFKA-16307
> URL: https://issues.apache.org/jira/browse/KAFKA-16307
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Priority: Major
>
> The metric does not seem to be accurate. Requires investigation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16306) GroupCoordinatorService logger is not configured

2024-02-26 Thread Anton Liauchuk (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Liauchuk reassigned KAFKA-16306:
--

Assignee: Jeff Kim  (was: Anton Liauchuk)

> GroupCoordinatorService logger is not configured
> 
>
> Key: KAFKA-16306
> URL: https://issues.apache.org/jira/browse/KAFKA-16306
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Minor
>
> The GroupCoordinatorService constructor initializes with the wrong logger 
> class:
> ```
> GroupCoordinatorService(
> LogContext logContext,
> GroupCoordinatorConfig config,
> CoordinatorRuntime runtime,
> GroupCoordinatorMetrics groupCoordinatorMetrics
> ) {
>     this.log = logContext.logger(CoordinatorLoader.class);
>     this.config = config;
>     this.runtime = runtime;
>     this.groupCoordinatorMetrics = groupCoordinatorMetrics;
> }
> ```
> change this to GroupCoordinatorService.class



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16306: fix GroupCoordinatorService logger [kafka]

2024-02-26 Thread via GitHub


jeffkbkim opened a new pull request, #15433:
URL: https://github.com/apache/kafka/pull/15433

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16306) GroupCoordinatorService logger is not configured

2024-02-26 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-16306:
---

Assignee: Jeff Kim

> GroupCoordinatorService logger is not configured
> 
>
> Key: KAFKA-16306
> URL: https://issues.apache.org/jira/browse/KAFKA-16306
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Minor
>
> The GroupCoordinatorService constructor initializes with the wrong logger 
> class:
> ```
> GroupCoordinatorService(
> LogContext logContext,
> GroupCoordinatorConfig config,
> CoordinatorRuntime runtime,
> GroupCoordinatorMetrics groupCoordinatorMetrics
> ) {
>     this.log = logContext.logger(CoordinatorLoader.class);
>     this.config = config;
>     this.runtime = runtime;
>     this.groupCoordinatorMetrics = groupCoordinatorMetrics;
> }
> ```
> change this to GroupCoordinatorService.class



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16306) GroupCoordinatorService logger is not configured

2024-02-26 Thread Anton Liauchuk (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Liauchuk reassigned KAFKA-16306:
--

Assignee: Anton Liauchuk  (was: Jeff Kim)

> GroupCoordinatorService logger is not configured
> 
>
> Key: KAFKA-16306
> URL: https://issues.apache.org/jira/browse/KAFKA-16306
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Assignee: Anton Liauchuk
>Priority: Minor
>
> The GroupCoordinatorService constructor initializes with the wrong logger 
> class:
> ```
> GroupCoordinatorService(
> LogContext logContext,
> GroupCoordinatorConfig config,
> CoordinatorRuntime runtime,
> GroupCoordinatorMetrics groupCoordinatorMetrics
> ) {
>     this.log = logContext.logger(CoordinatorLoader.class);
>     this.config = config;
>     this.runtime = runtime;
>     this.groupCoordinatorMetrics = groupCoordinatorMetrics;
> }
> ```
> change this to GroupCoordinatorService.class



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16306) GroupCoordinatorService logger is not configured

2024-02-26 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-16306:


 Summary: GroupCoordinatorService logger is not configured
 Key: KAFKA-16306
 URL: https://issues.apache.org/jira/browse/KAFKA-16306
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim


The GroupCoordinatorService constructor initializes with the wrong logger class:

```

GroupCoordinatorService(
LogContext logContext,
GroupCoordinatorConfig config,
CoordinatorRuntime runtime,
GroupCoordinatorMetrics groupCoordinatorMetrics
) {
    this.log = logContext.logger(CoordinatorLoader.class);
    this.config = config;
    this.runtime = runtime;
    this.groupCoordinatorMetrics = groupCoordinatorMetrics;
}

```

change this to GroupCoordinatorService.class



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop

2024-02-26 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820747#comment-17820747
 ] 

Kirk True commented on KAFKA-16160:
---

[~pnee]—do you have some insights on how [~phuctran] can reproduce this? Thanks!

> AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
> --
>
> Key: KAFKA-16160
> URL: https://issues.apache.org/jira/browse/KAFKA-16160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Observing some excessive logging running AsyncKafkaConsumer and observing 
> excessive logging of :
> {code:java}
> 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, 
> groupId=concurrent_consumer] Node is not ready, handle the request in the 
> next event loop: node=worker4:9092 (id: 2147483644 rack: null), 
> request=UnsentRequest{requestBuil     
> der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', 
> memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, 
> rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], 
> serverAssignor=null, topicP     
> artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), 
> handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b,
>  node=Optional[worker4:9092 (id: 2147483644 rack: null)]     , 
> timer=org.apache.kafka.common.utils.Timer@55ed4733} 
> (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code}
> This seems to be triggered by a tight poll loop of the network thread.  The 
> right thing to do is to backoff a bit for that given node and retry later.
> This should be a blocker for 3.8 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16190) Member should send full heartbeat when rejoining

2024-02-26 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820746#comment-17820746
 ] 

Kirk True commented on KAFKA-16190:
---

[~lucasbru]—could you take a look at the pull request for this Jira? Thanks!

> Member should send full heartbeat when rejoining
> 
>
> Key: KAFKA-16190
> URL: https://issues.apache.org/jira/browse/KAFKA-16190
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Quoc Phong Dang
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support, newbie
> Fix For: 3.8.0
>
>
> The heartbeat request builder should make sure that all fields are sent in 
> the heartbeat request when the consumer rejoins (currently the 
> HeartbeatRequestManager request builder is reset on failure scenarios, which 
> should cover the fence+rejoin sequence). 
> Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses 
> this exact case given that it does explicitly change the subscription when it 
> gets fenced. We should ensure we test a consumer that keeps it same initial 
> subscription when it rejoins after being fenced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs [kafka]

2024-02-26 Thread via GitHub


dajac opened a new pull request, #15432:
URL: https://github.com/apache/kafka/pull/15432

   I have noticed the following log when a __consumer_offsets partition 
immigrate from a broker. It appends because the event is queued up after the 
event that unloads the state machine. This patch fixes it and fixes another 
similar one.
   
   ```
   [2024-02-06 17:14:51,359] ERROR [GroupCoordinator id=1] Execution of 
UpdateImage(tp=__consumer_offsets-28, offset=13251) failed due to This is not 
the correct coordinator.. 
(org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
   org.apache.kafka.common.errors.NotCoordinatorException: This is not the 
correct coordinator.
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-26 Thread via GitHub


dajac commented on PR #15364:
URL: https://github.com/apache/kafka/pull/15364#issuecomment-1964189888

   @jeffkbkim Thanks for your comments. I have addressed all of them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-26 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1502633032


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2079,253 +2056,20 @@ public void testReconciliationProcess() {
 
 assertRecordsEquals(Collections.singletonList(
 RecordHelpers.newCurrentAssignmentRecord(groupId, new 
ConsumerGroupMember.Builder(memberId3)
+.setState(MemberState.STABLE)
 .setMemberEpoch(11)
 .setPreviousMemberEpoch(11)
-.setTargetMemberEpoch(11)
 .setAssignedPartitions(mkAssignment(
 mkTopicAssignment(fooTopicId, 4, 5),
 mkTopicAssignment(barTopicId, 1)))
 .build())),
 result.records()
 );
 
-assertEquals(ConsumerGroupMember.MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId3));
+assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId3));
 assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, 
context.consumerGroupState(groupId));
 }
 
-@Test
-public void testReconciliationRestartsWhenNewTargetAssignmentIsInstalled() 
{

Review Comment:
   Correct.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
 response.setAssignment(createResponseAssignment(updatedMember));
 }
 
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId   The group id.
+ * @param memberThe member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ *  a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment  The target assignment.
+ * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+ *  is reported in the ConsumerGroupHeartbeat 
API and
+ *  it could be null if not provided.
+ * @param records   The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+private ConsumerGroupMember maybeReconcile(
+String groupId,
+ConsumerGroupMember member,
+BiFunction currentPartitionEpoch,
+int targetAssignmentEpoch,
+Assignment targetAssignment,
+List 
ownedTopicPartitions,
+List records
+) {
+if (member.isReconciledTo(targetAssignmentEpoch)) {
+return member;
+}
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+.withCurrentPartitionEpoch(currentPartitionEpoch)
+.withOwnedTopicPartitions(ownedTopicPartitions)
+.build();
+
+if (!updatedMember.equals(member)) {
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+scheduleConsumerGroupRebalanceTimeout(
+groupId,
+updatedMember.memberId(),
+updatedMember.memberEpoch(),
+updatedMember.rebalanceTimeoutMs()
+);
+} else {

Review Comment:
   Yeah, that's right.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions(
  * @return A new ConsumerGroupMember or the current one.
  */
 public ConsumerGroupMember 

[jira] [Comment Edited] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh

2024-02-26 Thread Ahmed Sobeh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820677#comment-17820677
 ] 

Ahmed Sobeh edited comment on KAFKA-16282 at 2/26/24 1:23 PM:
--

Thanks both [~showuon]  @! I just finished writing up the 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1021%3A+Allow+to+get+last+stable+offset+%28LSO%29+in+kafka-get-offsets.sh]
 can you please take a look and let me know if you have any comments before I 
send it out to the mailing list?


was (Author: JIRAUSER295920):
Thanks both! I just finished writing up the 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1021%3A+Allow+to+get+last+stable+offset+%28LSO%29+in+kafka-get-offsets.sh]
 can you please take a look and let me know if you have any comments before I 
send it out to the mailing list?

> Allow to get last stable offset (LSO) in kafka-get-offsets.sh
> -
>
> Key: KAFKA-16282
> URL: https://issues.apache.org/jira/browse/KAFKA-16282
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Ahmed Sobeh
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> Currently, when using `kafka-get-offsets.sh` to get the offset by time, we 
> have these choices:
> {code:java}
> --time  /  timestamp of the offsets before 
> that. 
>   -1 or latest /   [Note: No offset is returned, if 
> the
>   -2 or earliest / timestamp greater than recently
>   -3 or max-timestamp /committed record timestamp is
>   -4 or earliest-local /   given.] (default: latest)
>   -5 or latest-tiered  
> {code}
> For the "latest" option, it'll always return the "high watermark" because we 
> always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
> would be good if the command can support to get the last stable offset (LSO) 
> for transaction support. That is, sending the option with 
> *IsolationLevel.READ_COMMITTED*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh

2024-02-26 Thread Ahmed Sobeh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820677#comment-17820677
 ] 

Ahmed Sobeh edited comment on KAFKA-16282 at 2/26/24 1:23 PM:
--

Thanks both [~showuon]  [~jolshan] ! I just finished writing up the 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1021%3A+Allow+to+get+last+stable+offset+%28LSO%29+in+kafka-get-offsets.sh]
 can you please take a look and let me know if you have any comments before I 
send it out to the mailing list?


was (Author: JIRAUSER295920):
Thanks both [~showuon]  @! I just finished writing up the 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1021%3A+Allow+to+get+last+stable+offset+%28LSO%29+in+kafka-get-offsets.sh]
 can you please take a look and let me know if you have any comments before I 
send it out to the mailing list?

> Allow to get last stable offset (LSO) in kafka-get-offsets.sh
> -
>
> Key: KAFKA-16282
> URL: https://issues.apache.org/jira/browse/KAFKA-16282
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Ahmed Sobeh
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> Currently, when using `kafka-get-offsets.sh` to get the offset by time, we 
> have these choices:
> {code:java}
> --time  /  timestamp of the offsets before 
> that. 
>   -1 or latest /   [Note: No offset is returned, if 
> the
>   -2 or earliest / timestamp greater than recently
>   -3 or max-timestamp /committed record timestamp is
>   -4 or earliest-local /   given.] (default: latest)
>   -5 or latest-tiered  
> {code}
> For the "latest" option, it'll always return the "high watermark" because we 
> always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
> would be good if the command can support to get the last stable offset (LSO) 
> for transaction support. That is, sending the option with 
> *IsolationLevel.READ_COMMITTED*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] migrate StreamedJoinTest to Mockito [kafka]

2024-02-26 Thread via GitHub


divijvaidya commented on code in PR #15424:
URL: https://github.com/apache/kafka/pull/15424#discussion_r1502590907


##
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/StreamJoinedTest.scala:
##
@@ -21,24 +21,21 @@ import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
 import org.apache.kafka.streams.scala.serialization.Serdes
 import org.apache.kafka.streams.scala.serialization.Serdes._
 import org.apache.kafka.streams.state.Stores
-import org.easymock.EasyMock
-import org.easymock.EasyMock.{createMock, replay}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{BeforeEach, Test}
+import org.mockito.Mockito
 
 import java.time.Duration
 
 class StreamJoinedTest {

Review Comment:
   Please add the annotation `@RunWith(MockitoJUnitRunner.StrictStubs.class)` 
and resolve any errors that it may surface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16305) Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake

2024-02-26 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula updated KAFKA-16305:
--
Summary: Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS 
handshake  (was: # Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS 
handshake)

> Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake
> --
>
> Key: KAFKA-16305
> URL: https://issues.apache.org/jira/browse/KAFKA-16305
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Gaurav Narula
>Assignee: Gaurav Narula
>Priority: Major
>
> Kafka allows users to configure custom {{SSLEngine}} via the 
> {{SslEngineFactory}} interface. There have been attempts to use an OpenSSL 
> based {{SSLEngine}} using {{netty-handler}} over the JDK based implementation 
> for performance reasons.
> While trying to use a Netty/Openssl based SSLEngine, we observe that the 
> server hangs while performing the TLS handshake.  We observe the following 
> logs
> {code}
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [INFO ] 
> Selector - [SocketServer listenerType=ZK_BROKER, nodeId=101] 
> XX-Gaurav: calling prepare channelId 127.0.0.1:60045-127.0.0.1:60046-0
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 state HANDSHAKE
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake NEED_UNWRAP channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0, appReadBuffer pos 0, netReadBuffer pos 0, 
> netWriteBuffer pos 0
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake handshakeUnwrap channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 doRead true
> 2024-02-26 01:40:00,118 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake post handshakeUnwrap: channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 handshakeStatus NEED_UNWRAP status 
> BUFFER_UNDERFLOW read 0
> 2024-02-26 01:40:00,118 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake post unwrap NEED_UNWRAP channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0, handshakeResult Status = BUFFER_UNDERFLOW 
> HandshakeStatus = NEED_UNWRAP 

[jira] [Created] (KAFKA-16305) # Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake

2024-02-26 Thread Gaurav Narula (Jira)
Gaurav Narula created KAFKA-16305:
-

 Summary: # Optimisation in SslTransportLayer:handshakeUnwrap 
stalls TLS handshake
 Key: KAFKA-16305
 URL: https://issues.apache.org/jira/browse/KAFKA-16305
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.6.1, 3.7.0
Reporter: Gaurav Narula


Kafka allows users to configure custom {{SSLEngine}} via the 
{{SslEngineFactory}} interface. There have been attempts to use an OpenSSL 
based {{SSLEngine}} using {{netty-handler}} over the JDK based implementation 
for performance reasons.

While trying to use a Netty/Openssl based SSLEngine, we observe that the server 
hangs while performing the TLS handshake.  We observe the following logs


{code}
2024-02-26 01:40:00,117 
data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
SslTransportLayer- [SslTransportLayer 
channelId=127.0.0.1:60045-127.0.0.1:60046-0 
key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED
2024-02-26 01:40:00,117 
data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [INFO ] 
Selector - [SocketServer listenerType=ZK_BROKER, nodeId=101] 
XX-Gaurav: calling prepare channelId 127.0.0.1:60045-127.0.0.1:60046-0
2024-02-26 01:40:00,117 
data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
SslTransportLayer- [SslTransportLayer 
channelId=127.0.0.1:60045-127.0.0.1:60046-0 
key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED
2024-02-26 01:40:00,117 
data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
SslTransportLayer- [SslTransportLayer 
channelId=127.0.0.1:60045-127.0.0.1:60046-0 
key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
127.0.0.1:60045-127.0.0.1:60046-0 state HANDSHAKE
2024-02-26 01:40:00,117 
data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
SslTransportLayer- [SslTransportLayer 
channelId=127.0.0.1:60045-127.0.0.1:60046-0 
key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
interestOps=1, readyOps=0] SSLHandshake NEED_UNWRAP channelId 
127.0.0.1:60045-127.0.0.1:60046-0, appReadBuffer pos 0, netReadBuffer pos 0, 
netWriteBuffer pos 0
2024-02-26 01:40:00,117 
data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
SslTransportLayer- [SslTransportLayer 
channelId=127.0.0.1:60045-127.0.0.1:60046-0 
key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
interestOps=1, readyOps=0] SSLHandshake handshakeUnwrap channelId 
127.0.0.1:60045-127.0.0.1:60046-0 doRead true
2024-02-26 01:40:00,118 
data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
SslTransportLayer- [SslTransportLayer 
channelId=127.0.0.1:60045-127.0.0.1:60046-0 
key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
interestOps=1, readyOps=0] SSLHandshake post handshakeUnwrap: channelId 
127.0.0.1:60045-127.0.0.1:60046-0 handshakeStatus NEED_UNWRAP status 
BUFFER_UNDERFLOW read 0
2024-02-26 01:40:00,118 
data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
SslTransportLayer- [SslTransportLayer 
channelId=127.0.0.1:60045-127.0.0.1:60046-0 
key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
interestOps=1, readyOps=0] SSLHandshake post unwrap NEED_UNWRAP channelId 
127.0.0.1:60045-127.0.0.1:60046-0, handshakeResult Status = BUFFER_UNDERFLOW 
HandshakeStatus = NEED_UNWRAP bytesConsumed = 0 bytesProduced = 0, 
appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 0 handshakeStatus 
NEED_UNWRAP
2024-02-26 01:40:00,118 
data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
SslTransportLayer- [SslTransportLayer 
channelId=127.0.0.1:60045-127.0.0.1:60046-0 
key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 

[jira] [Assigned] (KAFKA-16305) # Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake

2024-02-26 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula reassigned KAFKA-16305:
-

Assignee: Gaurav Narula

> # Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake
> 
>
> Key: KAFKA-16305
> URL: https://issues.apache.org/jira/browse/KAFKA-16305
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Gaurav Narula
>Assignee: Gaurav Narula
>Priority: Major
>
> Kafka allows users to configure custom {{SSLEngine}} via the 
> {{SslEngineFactory}} interface. There have been attempts to use an OpenSSL 
> based {{SSLEngine}} using {{netty-handler}} over the JDK based implementation 
> for performance reasons.
> While trying to use a Netty/Openssl based SSLEngine, we observe that the 
> server hangs while performing the TLS handshake.  We observe the following 
> logs
> {code}
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [INFO ] 
> Selector - [SocketServer listenerType=ZK_BROKER, nodeId=101] 
> XX-Gaurav: calling prepare channelId 127.0.0.1:60045-127.0.0.1:60046-0
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 state HANDSHAKE
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake NEED_UNWRAP channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0, appReadBuffer pos 0, netReadBuffer pos 0, 
> netWriteBuffer pos 0
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake handshakeUnwrap channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 doRead true
> 2024-02-26 01:40:00,118 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake post handshakeUnwrap: channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 handshakeStatus NEED_UNWRAP status 
> BUFFER_UNDERFLOW read 0
> 2024-02-26 01:40:00,118 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake post unwrap NEED_UNWRAP channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0, handshakeResult Status = BUFFER_UNDERFLOW 
> HandshakeStatus = NEED_UNWRAP bytesConsumed = 0 bytesProduced = 0, 
> appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 0 
> handshakeStatus 

Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]

2024-02-26 Thread via GitHub


msn-tldr commented on code in PR #15385:
URL: https://github.com/apache/kafka/pull/15385#discussion_r1502561721


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1260,6 +1265,103 @@ else if (partition.equals(internalPart))
 Mockito.reset(mockListener);
 }
 
+/**
+ * Test that concurrently updating Metadata, and fetching the 
corresponding MetadataSnapshot & Cluster work as expected, i.e.
+ * snapshot & cluster contain the relevant updates.
+ */
+@Test
+public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws 
InterruptedException {
+Time time = new MockTime();
+metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, 
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
+
+// Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be 
retained in the update. Both will have leader-epoch 100.
+int oldNodeCount = 10;
+String topic1 = "test_topic1";
+String topic2 = "test_topic2";
+TopicPartition topic1Part0 = new TopicPartition(topic1, 0);
+Map topicPartitionCounts = new HashMap<>();
+int oldPartitionCount = 1;
+topicPartitionCounts.put(topic1, oldPartitionCount);
+topicPartitionCounts.put(topic2, oldPartitionCount);
+Map topicIds = new HashMap<>();
+topicIds.put(topic1, Uuid.randomUuid());
+topicIds.put(topic2, Uuid.randomUuid());
+int oldLeaderEpoch = 100;
+MetadataResponse metadataResponse =
+RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, 
Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot();
+Cluster cluster = metadata.fetch();
+// Validate metadata snapshot & cluster are setup as expected.
+assertEquals(cluster, snapshot.cluster());
+assertEquals(oldNodeCount, snapshot.cluster().nodes().size());
+assertEquals(oldPartitionCount, 
snapshot.cluster().partitionCountForTopic(topic1));
+assertEquals(oldPartitionCount, 
snapshot.cluster().partitionCountForTopic(topic2));
+assertEquals(OptionalInt.of(oldLeaderEpoch), 
snapshot.leaderEpochFor(topic1Part0));
+
+// Setup 6 threads, where 3 are updating metadata & 3 are reading 
snapshot/cluster.
+// Metadata will be updated with higher # of nodes, partition-counts, 
leader-epoch.
+int numThreads = 6;
+ExecutorService service = Executors.newFixedThreadPool(numThreads);

Review Comment:
   done at the end of the test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]

2024-02-26 Thread via GitHub


msn-tldr commented on code in PR #15385:
URL: https://github.com/apache/kafka/pull/15385#discussion_r1502561721


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1260,6 +1265,103 @@ else if (partition.equals(internalPart))
 Mockito.reset(mockListener);
 }
 
+/**
+ * Test that concurrently updating Metadata, and fetching the 
corresponding MetadataSnapshot & Cluster work as expected, i.e.
+ * snapshot & cluster contain the relevant updates.
+ */
+@Test
+public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws 
InterruptedException {
+Time time = new MockTime();
+metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, 
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
+
+// Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be 
retained in the update. Both will have leader-epoch 100.
+int oldNodeCount = 10;
+String topic1 = "test_topic1";
+String topic2 = "test_topic2";
+TopicPartition topic1Part0 = new TopicPartition(topic1, 0);
+Map topicPartitionCounts = new HashMap<>();
+int oldPartitionCount = 1;
+topicPartitionCounts.put(topic1, oldPartitionCount);
+topicPartitionCounts.put(topic2, oldPartitionCount);
+Map topicIds = new HashMap<>();
+topicIds.put(topic1, Uuid.randomUuid());
+topicIds.put(topic2, Uuid.randomUuid());
+int oldLeaderEpoch = 100;
+MetadataResponse metadataResponse =
+RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, 
Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot();
+Cluster cluster = metadata.fetch();
+// Validate metadata snapshot & cluster are setup as expected.
+assertEquals(cluster, snapshot.cluster());
+assertEquals(oldNodeCount, snapshot.cluster().nodes().size());
+assertEquals(oldPartitionCount, 
snapshot.cluster().partitionCountForTopic(topic1));
+assertEquals(oldPartitionCount, 
snapshot.cluster().partitionCountForTopic(topic2));
+assertEquals(OptionalInt.of(oldLeaderEpoch), 
snapshot.leaderEpochFor(topic1Part0));
+
+// Setup 6 threads, where 3 are updating metadata & 3 are reading 
snapshot/cluster.
+// Metadata will be updated with higher # of nodes, partition-counts, 
leader-epoch.
+int numThreads = 6;
+ExecutorService service = Executors.newFixedThreadPool(numThreads);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh

2024-02-26 Thread Ahmed Sobeh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820677#comment-17820677
 ] 

Ahmed Sobeh commented on KAFKA-16282:
-

Thanks both! I just finished writing up the 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1021%3A+Allow+to+get+last+stable+offset+%28LSO%29+in+kafka-get-offsets.sh]
 can you please take a look and let me know if you have any comments before I 
send it out to the mailing list?

> Allow to get last stable offset (LSO) in kafka-get-offsets.sh
> -
>
> Key: KAFKA-16282
> URL: https://issues.apache.org/jira/browse/KAFKA-16282
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Ahmed Sobeh
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> Currently, when using `kafka-get-offsets.sh` to get the offset by time, we 
> have these choices:
> {code:java}
> --time  /  timestamp of the offsets before 
> that. 
>   -1 or latest /   [Note: No offset is returned, if 
> the
>   -2 or earliest / timestamp greater than recently
>   -3 or max-timestamp /committed record timestamp is
>   -4 or earliest-local /   given.] (default: latest)
>   -5 or latest-tiered  
> {code}
> For the "latest" option, it'll always return the "high watermark" because we 
> always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It 
> would be good if the command can support to get the last stable offset (LSO) 
> for transaction support. That is, sending the option with 
> *IsolationLevel.READ_COMMITTED*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Bump 3.7 branch's version to 3.7.1-SNAPSHOT [kafka]

2024-02-26 Thread via GitHub


stanislavkozlovski opened a new pull request, #15431:
URL: https://github.com/apache/kafka/pull/15431

   This patch updates the 3.7 release branch's version to 3.7.1-SNAPSHOT as per 
the wiki process


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15716: KRaft support in EpochDrivenReplicationProtocolAcceptanceTest [kafka]

2024-02-26 Thread via GitHub


mimaison commented on code in PR #15295:
URL: https://github.com/apache/kafka/pull/15295#discussion_r1502397230


##
core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala:
##
@@ -53,17 +58,26 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends 
QuorumTestHarness wit
   val topic = "topic1"
   val msg = new Array[Byte](1000)
   val msgBigger = new Array[Byte](1)
-  var brokers: Seq[KafkaServer] = _
+  var brokers: Seq[KafkaBroker] = _
   var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
   var consumer: Consumer[Array[Byte], Array[Byte]] = _
 
+  private def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
+  private def listenerName: ListenerName = 
ListenerName.forSecurityProtocol(securityProtocol)
+
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
+if (TestInfoUtils.isKRaft(testInfo) && 
metadataVersion.isLessThan(IBP_3_3_IV0)) {

Review Comment:
   Can you explain why we need this change? `metadataVersion` is hard coded to 
`MetadataVersion.latestTesting`, how can it be less than `IBP_3_3_IV0`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs

2024-02-26 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820663#comment-17820663
 ] 

Lucas Brutschy commented on KAFKA-16008:


Fixed by https://issues.apache.org/jira/browse/KAFKA-16258

> Fix PlaintextConsumerTest.testMaxPollIntervalMs
> ---
>
> Key: KAFKA-16008
> URL: https://issues.apache.org/jira/browse/KAFKA-16008
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>     at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
> at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
> at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs

2024-02-26 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy resolved KAFKA-16008.

Resolution: Duplicate

> Fix PlaintextConsumerTest.testMaxPollIntervalMs
> ---
>
> Key: KAFKA-16008
> URL: https://issues.apache.org/jira/browse/KAFKA-16008
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>     at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
> at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
> at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-26 Thread via GitHub


lucasbru merged PR #15415:
URL: https://github.com/apache/kafka/pull/15415


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Optimize EventAccumulator [kafka]

2024-02-26 Thread via GitHub


dajac opened a new pull request, #15430:
URL: https://github.com/apache/kafka/pull/15430

   `poll(long timeout, TimeUnit unit)` is either used with `Long.MAX_VALUE` or 
`0`. This patch replaces it with `poll` and `take`. It removes the `awaitNanos` 
usage.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16066: Upgrade apacheds to 2.0.0.AM27 With apache kerby [kafka]

2024-02-26 Thread via GitHub


mimaison commented on code in PR #15277:
URL: https://github.com/apache/kafka/pull/15277#discussion_r1502366164


##
core/src/test/scala/kafka/security/minikdc/MiniKdc.scala:
##
@@ -19,38 +19,22 @@
 package kafka.security.minikdc
 
 import java.io._
-import java.net.InetSocketAddress
 import java.nio.charset.StandardCharsets
 import java.nio.file.Files
 import java.text.MessageFormat
 import java.util.{Locale, Properties, UUID}
-
 import kafka.utils.{CoreUtils, Exit, Logging}
+import 
org.apache.directory.server.kerberos.shared.crypto.encryption.KerberosKeyFactory
 
 import scala.jdk.CollectionConverters._
-import org.apache.commons.lang.text.StrSubstitutor
-import org.apache.directory.api.ldap.model.entry.{DefaultEntry, Entry}
-import org.apache.directory.api.ldap.model.ldif.LdifReader
-import org.apache.directory.api.ldap.model.name.Dn
-import 
org.apache.directory.api.ldap.schema.extractor.impl.DefaultSchemaLdifExtractor
-import org.apache.directory.api.ldap.schema.loader.LdifSchemaLoader
-import org.apache.directory.api.ldap.schema.manager.impl.DefaultSchemaManager
-import org.apache.directory.server.constants.ServerDNConstants
-import org.apache.directory.server.core.DefaultDirectoryService
-import org.apache.directory.server.core.api.{CacheService, DirectoryService, 
InstanceLayout}
-import org.apache.directory.server.core.api.schema.SchemaPartition
-import org.apache.directory.server.core.kerberos.KeyDerivationInterceptor
-import org.apache.directory.server.core.partition.impl.btree.jdbm.{JdbmIndex, 
JdbmPartition}
-import org.apache.directory.server.core.partition.ldif.LdifPartition
-import org.apache.directory.server.kerberos.KerberosConfig
-import org.apache.directory.server.kerberos.kdc.KdcServer
-import 
org.apache.directory.server.kerberos.shared.crypto.encryption.KerberosKeyFactory
-import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry}
-import org.apache.directory.server.protocol.shared.transport.{TcpTransport, 
UdpTransport}
-import org.apache.directory.server.xdbm.Index
-import org.apache.directory.shared.kerberos.KerberosTime
+import org.apache.kerby.kerberos.kerb.KrbException
+import org.apache.kerby.kerberos.kerb.identity.backend.BackendConfig
+import org.apache.kerby.kerberos.kerb.server.{KdcConfig, KdcConfigKey, 
SimpleKdcServer}
 import org.apache.kafka.common.utils.{Java, Utils}
-
+import org.apache.kerby.kerberos.kerb.`type`.KerberosTime
+import org.apache.kerby.kerberos.kerb.`type`.base.{EncryptionKey, 
PrincipalName}
+import org.apache.kerby.kerberos.kerb.keytab.{Keytab, KeytabEntry}
+import org.apache.kerby.util.NetworkUtil
 /**
   * Mini KDC based on Apache Directory Server that can be embedded in tests or 
used from command line as a standalone

Review Comment:
   I think this comment still needs to be addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException

2024-02-26 Thread Anatolii Popov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anatolii Popov updated KAFKA-16105:
---
Labels: tiered-storage  (was: )

> Reassignment of tiered topics is failing due to RemoteStorageException
> --
>
> Key: KAFKA-16105
> URL: https://issues.apache.org/jira/browse/KAFKA-16105
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Reporter: Anatolii Popov
>Priority: Critical
>  Labels: tiered-storage
>
> When partition reassignment is happening for a tiered topic in most of the 
> cases it's stuck with RemoteStorageException's on follower nodes saying that 
> it can not construct remote log auxilary state:
>  
> {code:java}
> [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, 
> fetcherId=2] Error building remote log auxiliary state for test-24 
> (kafka.server.ReplicaFetcherThread)
>                                          
> org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't 
> build the state from remote store for partition: test-24, currentLeaderEpoch: 
> 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the 
> previous remote log segment metadata was not found
>                                                  at 
> kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259)
>                                                  at 
> kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106)
>                                                  at 
> kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413)
>                                                  at 
> scala.Option.foreach(Option.scala:437)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331)
>                                                  at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321)
>                                                  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
>                                                  at 
> scala.Option.foreach(Option.scala:437)
>                                                  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
>                                                  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
>                                                  at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>                                                  at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)
>  {code}
>  
> Scenario:
> A cluster of 3 nodes with a single topic with 30 partitions. All partitions 
> have tiered segments.
> Adding 3 more nodes to the cluster and making a reassignment to move all the 
> data to new nodes.
> Behavior:
> For most of the partitions reassignment is happening smoothly.
> For some of the partitions when a new node starts to get assignments it reads 
> __remote_log_metadata topic and tries to initialize the metadata cache on 
> records with COPY_SEGMENT_STARTED. If it's reading such a message for the 
> partition before the partition was assigned to this specific node it ignores 
> the message, so skips the cache initialization and marks the partition as 
> assigned. So reassignment is stuck since COPY_SEGMENT_STARTED is never 
> 

Re: [PR] KAFKA-14747: record discarded FK join subscription responses [kafka]

2024-02-26 Thread via GitHub


AyoubOm commented on PR #15395:
URL: https://github.com/apache/kafka/pull/15395#issuecomment-1963731421

   @mjsax Please have a look when you have some time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix ReadOnlySessionStore java docs [kafka]

2024-02-26 Thread via GitHub


AyoubOm commented on PR #15412:
URL: https://github.com/apache/kafka/pull/15412#issuecomment-1963723331

   @mimaison Please check this when you have some time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-26 Thread via GitHub


dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1502279590


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -20,83 +20,258 @@
 import kafka.admin.ConsumerGroupCommand;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Properties;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListConsumerGroups(String quorum) throws Exception {
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
 String simpleGroup = "simple-group";
+
+createOffsetsTopic(listenerName(), new Properties());
+
 addSimpleGroupExecutor(simpleGroup);
 addConsumerGroupExecutor(1);
+addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
 
 String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup));
+
+scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup, PROTOCOL_GROUP));
 final AtomicReference foundGroups = new 
AtomicReference<>();
+
 TestUtils.waitForCondition(() -> {
 foundGroups.set(service.listConsumerGroups().toSet());
 return Objects.equals(expectedGroups, foundGroups.get());
 }, "Expected --list to show groups " + expectedGroups + ", but found " 
+ foundGroups.get() + ".");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
+@Test
 public void testListWithUnrecognizedNewConsumerOption() {
 String[] cgcArgs = new String[]{"--new-consumer", 
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testListConsumerGroupsWithStates() throws Exception {
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+public void testListConsumerGroupsWithStates(String quorum, String 
groupProtocol) throws Exception {
 String simpleGroup = "simple-group";
+
+createOffsetsTopic(listenerName(), new Properties());
+
 addSimpleGroupExecutor(simpleGroup);
-addConsumerGroupExecutor(1);
+addConsumerGroupExecutor(1, groupProtocol);
 
 String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
 
-scala.collection.Set expectedListing = 
set(Arrays.asList(
-new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-new ConsumerGroupListing(GROUP, false, 
Optional.of(ConsumerGroupState.STABLE;
+Set expectedListing = new 
HashSet<>(Arrays.asList(
+new ConsumerGroupListing(
+simpleGroup,
+true,
+Optional.of(ConsumerGroupState.EMPTY),
+Optional.of(GroupType.CLASSIC)
+),
+new ConsumerGroupListing(
+GROUP,
+false,
+Optional.of(ConsumerGroupState.STABLE),
+Optional.of(GroupType.parse(groupProtocol))
+)
+));
 
-final AtomicReference foundListing = new 
AtomicReference<>();
-TestUtils.waitForCondition(() -> {
-