[jira] [Commented] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh
[ https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820943#comment-17820943 ] Luke Chen commented on KAFKA-16282: --- [~ahmedsobeh] , thanks for the drafted KIP. I think you can refer to [KIP-1005|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1005%3A+Expose+EarliestLocalOffset+and+TieredOffset] for how to write a similar KIP. Other than that, you can directly start a discuss thread in dev mailing list as Justine suggested. > Allow to get last stable offset (LSO) in kafka-get-offsets.sh > - > > Key: KAFKA-16282 > URL: https://issues.apache.org/jira/browse/KAFKA-16282 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Ahmed Sobeh >Priority: Major > Labels: need-kip, newbie, newbie++ > > Currently, when using `kafka-get-offsets.sh` to get the offset by time, we > have these choices: > {code:java} > --time / timestamp of the offsets before > that. > -1 or latest / [Note: No offset is returned, if > the > -2 or earliest / timestamp greater than recently > -3 or max-timestamp /committed record timestamp is > -4 or earliest-local / given.] (default: latest) > -5 or latest-tiered > {code} > For the "latest" option, it'll always return the "high watermark" because we > always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It > would be good if the command can support to get the last stable offset (LSO) > for transaction support. That is, sending the option with > *IsolationLevel.READ_COMMITTED* -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15215: migrate StreamedJoinTest to Mockito [kafka]
ableegoldman merged PR #15424: URL: https://github.com/apache/kafka/pull/15424 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: migrate StreamedJoinTest to Mockito [kafka]
ableegoldman commented on PR #15424: URL: https://github.com/apache/kafka/pull/15424#issuecomment-1965693809 Looks like a new build was triggered when I renamed the PR but I have the results from before that and it looked good. Test failures all unrelated. Will merge this to trunk Build from before the name change: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15424/2/tests/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: migrate StreamedJoinTest to Mockito [kafka]
ableegoldman commented on PR #15424: URL: https://github.com/apache/kafka/pull/15424#issuecomment-1965688725 remember to name your PRs correctly -- I added the `KAFKA-15215` prefix since this came about from that ticket, but you can always use `MINOR:` for small fixes that don't have a corresponding ticket -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: migrate StreamedJoinTest to Mockito [kafka]
agavra commented on code in PR #15424: URL: https://github.com/apache/kafka/pull/15424#discussion_r1503563459 ## build.gradle: ## @@ -2333,6 +2333,8 @@ project(':streams:streams-scala') { testImplementation libs.junitJupiter testImplementation libs.easymock +testImplementation libs.mockitoCore Review Comment: I checked. looks like no tests in the streams scala tests used mockito yet 路 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: migrate StreamedJoinTest to Mockito [kafka]
ableegoldman commented on code in PR #15424: URL: https://github.com/apache/kafka/pull/15424#discussion_r1503562979 ## build.gradle: ## @@ -2333,6 +2333,8 @@ project(':streams:streams-scala') { testImplementation libs.junitJupiter testImplementation libs.easymock +testImplementation libs.mockitoCore Review Comment: Wait...why do we need to add a dependency for this? I thought the whole point of the change was to align this test with others in the same file -- shouldn't that mean whatever dependencies are needed are already there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16102) about DynamicListenerConfig, the dynamic modification of the listener's port or IP does not take effect.
[ https://issues.apache.org/jira/browse/KAFKA-16102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jialun Peng updated KAFKA-16102: Description: When I dynamically modify the parameters related to Kafka listeners, such as changing the IP or port value of a listener, the dynamic parameters under the corresponding path in ZooKeeper are updated. However, in reality, the modification of the IP or port for the corresponding listener does not take effect. This phenomenon consistently occurs. And there is a slight improvement as the error "Security protocol cannot be updated for existing listener" will be eliminated. was:When I dynamically modify the parameters related to Kafka listeners, such as changing the IP or port value of a listener, the dynamic parameters under the corresponding path in ZooKeeper are updated. However, in reality, the modification of the IP or port for the corresponding listener does not take effect. This phenomenon consistently occurs. And there is a slight improvement as the error "Security protocol cannot be updated for existing listener" will be eliminated. > about DynamicListenerConfig, the dynamic modification of the listener's port > or IP does not take effect. > > > Key: KAFKA-16102 > URL: https://issues.apache.org/jira/browse/KAFKA-16102 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 3.6.0 > Environment: Must be present in any environment. >Reporter: Jialun Peng >Assignee: Jialun Peng >Priority: Minor > Labels: easyfix > Fix For: 3.8.0 > > Original Estimate: 96h > Remaining Estimate: 96h > > When I dynamically modify the parameters related to Kafka listeners, such as > changing the IP or port value of a listener, the dynamic parameters under the > corresponding path in ZooKeeper are updated. However, in reality, the > modification of the IP or port for the corresponding listener does not take > effect. This phenomenon consistently occurs. And there is a slight > improvement as the error "Security protocol cannot be updated for existing > listener" will be eliminated. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]
showuon commented on PR #15213: URL: https://github.com/apache/kafka/pull/15213#issuecomment-1965678417 @clolov , do we have any update about the compilation error fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]
hachikuji merged PR #15385: URL: https://github.com/apache/kafka/pull/15385 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16305: Avoid optimisation in handshakeUnwrap [kafka]
gaurav-narula commented on code in PR #15434: URL: https://github.com/apache/kafka/pull/15434#discussion_r1503432226 ## clients/src/test/java/org/apache/kafka/common/security/ssl/NettySslEngineFactory.java: ## @@ -0,0 +1,159 @@ +package org.apache.kafka.common.security.ssl; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.utils.SecurityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.TrustManagerFactory; +import java.security.KeyStore; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class NettySslEngineFactory extends DefaultSslEngineFactory { Review Comment: Reverted that commit and added the unit test in 79902f5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
jeffkbkim commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1503404075 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationConfig.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum GroupProtocolMigrationConfig { Review Comment: it doesn't seem like other configs in KafkaConfigs are referred to as their own "Config" and the name made me think it would have several configs. How's `GroupProtocolMigrationPolicy` and `group.protocol.migration.policy`? Or we can stick with `group.protocol.migration` and go with `GroupProtocolMigration` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs [kafka]
jolshan commented on code in PR #15432: URL: https://github.com/apache/kafka/pull/15432#discussion_r1503408163 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1731,15 +1735,19 @@ public void onNewMetadataImage( // Push an event for each coordinator. coordinators.keySet().forEach(tp -> { scheduleInternalOperation("UpdateImage(tp=" + tp + ", offset=" + newImage.offset() + ")", tp, () -> { -withContextOrThrow(tp, context -> { -if (context.state == CoordinatorState.ACTIVE) { +CoordinatorContext context = coordinators.get(tp); +if (context != null && context.state == CoordinatorState.ACTIVE) { Review Comment: I noticed there are still some places we don't want the context to be null -- ie loading the partition. (And for all the run methods we want the coordinator to be active) Is the general rationale for this change that not having the conetext/active state is ok if we expect the coordinator moved? Is it possible we could try to load and the coordinator moves again? Just trying to understand the cases we accept null or non-active coordinators vs when we throw errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Add 3.7.0 to system tests [kafka]
stanislavkozlovski opened a new pull request, #15436: URL: https://github.com/apache/kafka/pull/15436 As per the [release instructions](https://cwiki.apache.org/confluence/display/KAFKA/Release+Process), bumping the versions in the associated files here as part of the 3.7 release -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize EventAccumulator [kafka]
jolshan commented on code in PR #15430: URL: https://github.com/apache/kafka/pull/15430#discussion_r1503400099 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java: ## @@ -137,31 +136,43 @@ public void add(T event) throws RejectedExecutionException { } /** - * Returns the next {{@link Event}} available. This method block indefinitely until - * one event is ready or the accumulator is closed. + * Returns the next {{@link Event}} available or null if no event is + * available. * - * @return The next event. + * @return The next event available or null. */ public T poll() { -return poll(Long.MAX_VALUE, TimeUnit.SECONDS); +lock.lock(); +try { +K key = randomKey(); +if (key == null) return null; + +Queue queue = queues.get(key); +T event = queue.poll(); + +if (queue.isEmpty()) queues.remove(key); +inflightKeys.add(key); +size--; + +return event; +} finally { +lock.unlock(); +} } /** - * Returns the next {{@link Event}} available. This method blocks for the provided - * time and returns null of not event is available. + * Returns the next {{@link Event}} available. This method blocks until an + * event is available or the thread is interrupted. * - * @param timeout The timeout. - * @param unit The timeout unit. * @return The next event available or null. */ -public T poll(long timeout, TimeUnit unit) { +public T take() { lock.lock(); try { K key = randomKey(); -long nanos = unit.toNanos(timeout); -while (key == null && !closed && nanos > 0) { +while (key == null && !closed) { try { -nanos = condition.awaitNanos(nanos); Review Comment: I think the idea was to simplify it. Since nanos was always 0 or max value, there's no need to specify specific values. Just have two methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize EventAccumulator [kafka]
jeffkbkim commented on code in PR #15430: URL: https://github.com/apache/kafka/pull/15430#discussion_r1503398332 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java: ## @@ -137,31 +136,43 @@ public void add(T event) throws RejectedExecutionException { } /** - * Returns the next {{@link Event}} available. This method block indefinitely until - * one event is ready or the accumulator is closed. + * Returns the next {{@link Event}} available or null if no event is + * available. * - * @return The next event. + * @return The next event available or null. */ public T poll() { -return poll(Long.MAX_VALUE, TimeUnit.SECONDS); +lock.lock(); +try { +K key = randomKey(); +if (key == null) return null; + +Queue queue = queues.get(key); +T event = queue.poll(); + +if (queue.isEmpty()) queues.remove(key); +inflightKeys.add(key); +size--; + +return event; +} finally { +lock.unlock(); +} } /** - * Returns the next {{@link Event}} available. This method blocks for the provided - * time and returns null of not event is available. + * Returns the next {{@link Event}} available. This method blocks until an + * event is available or the thread is interrupted. * - * @param timeout The timeout. - * @param unit The timeout unit. * @return The next event available or null. */ -public T poll(long timeout, TimeUnit unit) { +public T take() { lock.lock(); try { K key = randomKey(); -long nanos = unit.toNanos(timeout); -while (key == null && !closed && nanos > 0) { +while (key == null && !closed) { try { -nanos = condition.awaitNanos(nanos); Review Comment: it seems to me that the purpose of this PR is to remove this. how much worse is using awaitNanos compared to await? i can imagine a subtle impact but i guess i'd like to know your expectation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize EventAccumulator [kafka]
jeffkbkim commented on code in PR #15430: URL: https://github.com/apache/kafka/pull/15430#discussion_r1503396742 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -53,53 +50,19 @@ @Timeout(value = 60) public class MultiThreadedEventProcessorTest { -private static class MockEventAccumulator extends EventAccumulator { +private static class DelayEventAccumulator extends EventAccumulator { private final Time time; -private final Queue events; -private final long timeToPollMs; -private final AtomicBoolean isClosed; +private final long takeDelayMs; -public MockEventAccumulator(Time time, long timeToPollMs) { +public DelayEventAccumulator(Time time, long takeDelayMs) { this.time = time; -this.events = new LinkedList<>(); -this.timeToPollMs = timeToPollMs; -this.isClosed = new AtomicBoolean(false); +this.takeDelayMs = takeDelayMs; } @Override -public CoordinatorEvent poll() { -synchronized (events) { -while (events.isEmpty() && !isClosed.get()) { -try { -events.wait(); -} catch (Exception ignored) { - -} -} -time.sleep(timeToPollMs); -return events.poll(); -} -} - -@Override -public CoordinatorEvent poll(long timeout, TimeUnit unit) { -return null; -} - -@Override -public void add(CoordinatorEvent event) throws RejectedExecutionException { -synchronized (events) { -events.add(event); -events.notifyAll(); -} -} - -@Override -public void close() { -isClosed.set(true); -synchronized (events) { -events.notifyAll(); -} +public CoordinatorEvent take() { Review Comment: thanks for the simplification! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820880#comment-17820880 ] A. Sophie Blee-Goldman commented on KAFKA-16277: FYI I added you as a contributor so you should be able to self-assign tickets from now on. Thanks again for contributing a fix! > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Assignee: Cameron Redpath >Priority: Major > Fix For: 3.8.0 > > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two consumers, 6 partitions from each topic go to each consumer > * With three consumers, 4 partitions from each topic go to each consumer > * With four consumers, 3 partitions from each topic go to each consumer > > Of note, we only saw this behaviour after migrating to the > `CooperativeStickyAssignor`. It was not an issue with the default partition > assignment strategy. > > It is possible this may be intended behaviour. In which case, what is the > preferred workaround for our scenario? Our current workaround if we decide to > go ahead with the update to `CooperativeStickyAssignor` may be to limit our > consumers so they only subscribe to one topic, and have two consumer threads > per instance of the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-16277. Resolution: Fixed > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Priority: Major > Fix For: 3.8.0 > > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two consumers, 6 partitions from each topic go to each consumer > * With three consumers, 4 partitions from each topic go to each consumer > * With four consumers, 3 partitions from each topic go to each consumer > > Of note, we only saw this behaviour after migrating to the > `CooperativeStickyAssignor`. It was not an issue with the default partition > assignment strategy. > > It is possible this may be intended behaviour. In which case, what is the > preferred workaround for our scenario? Our current workaround if we decide to > go ahead with the update to `CooperativeStickyAssignor` may be to limit our > consumers so they only subscribe to one topic, and have two consumer threads > per instance of the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16277: --- Fix Version/s: 3.8.0 > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Priority: Major > Fix For: 3.8.0 > > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two consumers, 6 partitions from each topic go to each consumer > * With three consumers, 4 partitions from each topic go to each consumer > * With four consumers, 3 partitions from each topic go to each consumer > > Of note, we only saw this behaviour after migrating to the > `CooperativeStickyAssignor`. It was not an issue with the default partition > assignment strategy. > > It is possible this may be intended behaviour. In which case, what is the > preferred workaround for our scenario? Our current workaround if we decide to > go ahead with the update to `CooperativeStickyAssignor` may be to limit our > consumers so they only subscribe to one topic, and have two consumer threads > per instance of the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-16277: -- Assignee: Cameron Redpath > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Assignee: Cameron Redpath >Priority: Major > Fix For: 3.8.0 > > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two consumers, 6 partitions from each topic go to each consumer > * With three consumers, 4 partitions from each topic go to each consumer > * With four consumers, 3 partitions from each topic go to each consumer > > Of note, we only saw this behaviour after migrating to the > `CooperativeStickyAssignor`. It was not an issue with the default partition > assignment strategy. > > It is possible this may be intended behaviour. In which case, what is the > preferred workaround for our scenario? Our current workaround if we decide to > go ahead with the update to `CooperativeStickyAssignor` may be to limit our > consumers so they only subscribe to one topic, and have two consumer threads > per instance of the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16277: AbstractStickyAssignor - Sort owned TopicPartitions by partition when reassigning [kafka]
ableegoldman commented on PR #15416: URL: https://github.com/apache/kafka/pull/15416#issuecomment-1965331000 Test failures are unrelated, merged to trunk. Thanks for the fix! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16277: AbstractStickyAssignor - Sort owned TopicPartitions by partition when reassigning [kafka]
ableegoldman merged PR #15416: URL: https://github.com/apache/kafka/pull/15416 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16277: AbstractStickyAssignor - Sort owned TopicPartitions by partition when reassigning [kafka]
ableegoldman commented on code in PR #15416: URL: https://github.com/apache/kafka/pull/15416#discussion_r1503326278 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -663,6 +663,7 @@ private void assignOwnedPartitions() { String consumer = consumerEntry.getKey(); Review Comment: fair enough. thanks for taking a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: remove test constructor for PartitionAssignment [kafka]
cmccabe opened a new pull request, #15435: URL: https://github.com/apache/kafka/pull/15435 Remove the test constructor for PartitionAssignment and remove the TODO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16308) Formatting and Updating Kafka Features
Justine Olshan created KAFKA-16308: -- Summary: Formatting and Updating Kafka Features Key: KAFKA-16308 URL: https://issues.apache.org/jira/browse/KAFKA-16308 Project: Kafka Issue Type: Task Reporter: Justine Olshan Assignee: Justine Olshan As part of KIP-1022, we need to extend the storage and upgrade tools to support features other than metadata version. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Formatting+and+Updating+Features -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Optimize EventAccumulator [kafka]
jolshan commented on PR #15430: URL: https://github.com/apache/kafka/pull/15430#issuecomment-1965265194 Left one question -- otherwise lgtm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize EventAccumulator [kafka]
jolshan commented on code in PR #15430: URL: https://github.com/apache/kafka/pull/15430#discussion_r1503291909 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -53,53 +50,19 @@ @Timeout(value = 60) public class MultiThreadedEventProcessorTest { -private static class MockEventAccumulator extends EventAccumulator { +private static class DelayEventAccumulator extends EventAccumulator { private final Time time; -private final Queue events; -private final long timeToPollMs; -private final AtomicBoolean isClosed; +private final long takeDelayMs; -public MockEventAccumulator(Time time, long timeToPollMs) { +public DelayEventAccumulator(Time time, long takeDelayMs) { this.time = time; -this.events = new LinkedList<>(); -this.timeToPollMs = timeToPollMs; -this.isClosed = new AtomicBoolean(false); +this.takeDelayMs = takeDelayMs; } @Override -public CoordinatorEvent poll() { -synchronized (events) { Review Comment: It's interesting that we basically made the real accumulator do what this mock was doing But it also makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
kirktrue commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1503266038 ## clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java: ## @@ -79,9 +95,24 @@ public List isr() { return isr; } +/** + * Return the eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List elr() { +return elr; +} + +/** + * Return the last known eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List lastKnownElr() { Review Comment: And likewise here: ```suggestion public List lastKnownEligibleLeaderReplicas() { ``` ## clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java: ## @@ -79,9 +95,24 @@ public List isr() { return isr; } +/** + * Return the eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List elr() { Review Comment: Would you consider using the full name for the uninitiated? ```suggestion public List eligibleLeaderReplicas() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15695) Local log start offset is not updated on the follower after rebuilding remote log auxiliary state
[ https://issues.apache.org/jira/browse/KAFKA-15695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820852#comment-17820852 ] Henry Cai commented on KAFKA-15695: --- Can this fix be back ported into 3.6.X branch? > Local log start offset is not updated on the follower after rebuilding remote > log auxiliary state > - > > Key: KAFKA-15695 > URL: https://issues.apache.org/jira/browse/KAFKA-15695 > Project: Kafka > Issue Type: Bug > Components: replication, Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Nikhil Ramakrishnan >Assignee: Nikhil Ramakrishnan >Priority: Major > Labels: KIP-405, tiered-storage > Fix For: 3.7.0 > > > In 3.6, the local log start offset is not updated when reconstructing the > auxiliary state of the remote log on a follower. > The impact of this bug is significant because, if this follower becomes the > leader before the local log start offset has had a chance to be updated, > reads from any offset between [wrong log start offset; actual log start > offset] will be routed on the local storage, which does not contain the > corresponding data. Consumer reads will in this case never be satisfied. > > Reproduction case 1: > # Create cluster with 2 brokers, broker 0 and broker 1. > # Create a topic topicA with RF=1, 1 partition (topicA-0) on broker 0, and 1 > batch per segment. > # Produce 3 records to topicA, such that segment 1 and segment 2 with the > first two records are copied to remote and deleted from local storage. > # Reassign replica to add broker 1 to the replica set for topicA-0, and > elect broker 1 as the leader. > # Try to consume from the beginning of topicA-0. > > Reproduction case 2: > # Create a cluster with 2 brokers, broker 0 and broker 1. > # Create a topic topicA with RF=2, 1 partition (topicA-0) and 2 batches per > segment, with broker 0 as the leader. > # Stop broker 1, and produce 3 records to topicA, such that segment 1 with > the first two records are copied to remote and deleted from local storage. > # Start broker 1, let it catch up with broker 0. > # Stop broker 0 such that broker 1 is elected as the leader, and try to > consume from the beginning of topicA-0. > Consumer read will not be satisfied in these cases because the local log > start offset is not updated on broker 1 when it builds the auxiliary state of > the remote log segments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
kirktrue commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1503175975 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,25 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +public DescribeTopicsOptions useDescribeTopicsApi(boolean useDescribeTopicsApi) { +this.useDescribeTopicsApi = useDescribeTopicsApi; +return this; +} Review Comment: Can we add some comments here for a developer to know _why_ to use the topics API or not? ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -994,6 +1002,36 @@ public boolean isInternal() { } } +abstract class RecurringCall { +private final String name; +final long deadlineMs; +private final AdminClientRunnable runnable; +KafkaFutureImpl nextRun; +abstract Call generateCall(); + +public RecurringCall(String name, long deadlineMs, AdminClientRunnable runnable) { +this.name = name; +this.deadlineMs = deadlineMs; +this.runnable = runnable; +} + +public String toString() { +return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + ")"; Review Comment: ```suggestion return "RecurringCall(name=" + name + ", deadlineMs=" + deadlineMs + ")"; ``` ## clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java: ## @@ -79,9 +95,24 @@ public List isr() { return isr; } +/** + * Return the eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List elr() { +return elr; +} + +/** + * Return the last known eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List lastKnownElr() { +return lastKnownElr; +} + public String toString() { return "(partition=" + partition + ", leader=" + leader + ", replicas=" + -Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")"; +Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + +Utils.join(elr, ", ") + Utils.join(lastKnownElr, ", ") + ")"; Review Comment: ```suggestion Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ", elr=" + Utils.join(elr, ", ") + ", lastKnownElr=" + Utils.join(lastKnownElr, ", ") + ")"; ``` ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -994,6 +1002,36 @@ public boolean isInternal() { } } +abstract class RecurringCall { +private final String name; +final long deadlineMs; +private final AdminClientRunnable runnable; +KafkaFutureImpl nextRun; +abstract Call generateCall(); + +public RecurringCall(String name, long deadlineMs, AdminClientRunnable runnable) { +this.name = name; +this.deadlineMs = deadlineMs; +this.runnable = runnable; +} + +public String toString() { +return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + ")"; +} + +public void run() { +try { +do { +nextRun = new KafkaFutureImpl<>(); +Call call = generateCall(); +runnable.call(call, time.milliseconds()); +} while (nextRun.get()); +} catch (Exception e) { +log.info("Stop the recurring call " + name + " because " + e); Review Comment: Are we specifically wanting to avoid outputting a stack trace? ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2108,9 +2146,12 @@ void handleFailure(Throwable throwable) { public DescribeTopicsResult describeTopics(final TopicCollection topics, DescribeTopicsOptions options) { if (topics instanceof TopicIdCollection) return DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) topics).topicIds(), options)); -else if (topics instanceof TopicNameCollection) +else if (topics instanceof TopicNameCollection) { +if (options.useDescribeTopicsApi()) { +return DescribeTopicsResult.ofTopicNameIterator(new DescribeTopicPartitionsIterator(((TopicNameCollection) topics).topicNames(), options)); Review Comment: It's been my experience that it's "dangerous" to run arbitrary user code from within the context of the client code. User code can (and will) do unpredictable things with state, errors, threads, etc. The surrounding code inside the client has to be very careful to make sure it
[jira] [Commented] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh
[ https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820821#comment-17820821 ] Justine Olshan commented on KAFKA-16282: Thanks [~ahmedsobeh] A few things: For the Admin.java proposed change, picking one of the options and listing the other as a rejected alternative. We may go with one or the other, but it is good to take an opinion for folks to agree or disagree with. For the compatibility section, we could have compatibility issues if someone uses this new field on a broker that doesn't support it. We should probably throw and error in that case. Generally though, I think the KIP is ready for discussion, I may have further comments for you on the mailing list :) > Allow to get last stable offset (LSO) in kafka-get-offsets.sh > - > > Key: KAFKA-16282 > URL: https://issues.apache.org/jira/browse/KAFKA-16282 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Ahmed Sobeh >Priority: Major > Labels: need-kip, newbie, newbie++ > > Currently, when using `kafka-get-offsets.sh` to get the offset by time, we > have these choices: > {code:java} > --time / timestamp of the offsets before > that. > -1 or latest / [Note: No offset is returned, if > the > -2 or earliest / timestamp greater than recently > -3 or max-timestamp /committed record timestamp is > -4 or earliest-local / given.] (default: latest) > -5 or latest-tiered > {code} > For the "latest" option, it'll always return the "high watermark" because we > always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It > would be good if the command can support to get the last stable offset (LSO) > for transaction support. That is, sending the option with > *IsolationLevel.READ_COMMITTED* -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16305: Avoid optimisation in handshakeUnwrap [kafka]
ijuma commented on code in PR #15434: URL: https://github.com/apache/kafka/pull/15434#discussion_r1503149170 ## clients/src/test/java/org/apache/kafka/common/security/ssl/NettySslEngineFactory.java: ## @@ -0,0 +1,159 @@ +package org.apache.kafka.common.security.ssl; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.utils.SecurityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.TrustManagerFactory; +import java.security.KeyStore; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class NettySslEngineFactory extends DefaultSslEngineFactory { Review Comment: I would expect a unit test versus a test that adds a third party dependency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9062) Handle stalled writes to RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-9062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820819#comment-17820819 ] Matthias J. Sax commented on KAFKA-9062: Given that bulk loading was disabled, should we close this ticket? > Handle stalled writes to RocksDB > > > Key: KAFKA-9062 > URL: https://issues.apache.org/jira/browse/KAFKA-9062 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: new-streams-runtime-should-fix > > RocksDB may stall writes at times when background compactions or flushes are > having trouble keeping up. This means we can effectively end up blocking > indefinitely during a StateStore#put call within Streams, and may get kicked > from the group if the throttling does not ease up within the max poll > interval. > Example: when restoring large amounts of state from scratch, we use the > strategy recommended by RocksDB of turning off automatic compactions and > dumping everything into L0. We do batch somewhat, but do not sort these small > batches before loading into the db, so we end up with a large number of > unsorted L0 files. > When restoration is complete and we toggle the db back to normal (not bulk > loading) settings, a background compaction is triggered to merge all these > into the next level. This background compaction can take a long time to merge > unsorted keys, especially when the amount of data is quite large. > Any new writes while the number of L0 files exceeds the max will be stalled > until the compaction can finish, and processing after restoring from scratch > can block beyond the polling interval -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14919) MM2 ForwardingAdmin tests should not conflate admin operations
[ https://issues.apache.org/jira/browse/KAFKA-14919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Liauchuk reassigned KAFKA-14919: -- Assignee: Anton Liauchuk > MM2 ForwardingAdmin tests should not conflate admin operations > -- > > Key: KAFKA-14919 > URL: https://issues.apache.org/jira/browse/KAFKA-14919 > Project: Kafka > Issue Type: Test > Components: mirrormaker >Reporter: Greg Harris >Assignee: Anton Liauchuk >Priority: Minor > Labels: newbie > > The MirrorConnectorsWithCustomForwardingAdminIntegrationTest uses a special > implementation of ForwardingAdmin which records admin operations in a static > ConcurrentMap, which is then used to perform assertions. > This has the problem that one variable (allTopics) is used to perform > assertions for multiple different methods (adding topics, adding partitions, > and syncing configs), despite these operations each being tested separately. > This leads to the confusing behavior where each test appears to assert that a > particular operation has taken place, and instead asserts that at least one > of the operations has taken place. This allows a regression or timeout in one > operation to be hidden by the others, making the behavior of the tests much > less predictable. > These tests and/or the metadata store should be changed so that the tests are > isolated from one another, and actually perform the assertions that > correspond to their titles. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820801#comment-17820801 ] Chaitanya Mukka commented on KAFKA-16223: - Ah sure. Let me do that. Currently, the tests have been pretty modular but maybe creating a new class will help us track progress as well. > Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest > --- > > Key: KAFKA-16223 > URL: https://issues.apache.org/jira/browse/KAFKA-16223 > Project: Kafka > Issue Type: Sub-task > Components: connect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Optimize EventAccumulator [kafka]
jolshan commented on code in PR #15430: URL: https://github.com/apache/kafka/pull/15430#discussion_r1503073175 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java: ## @@ -137,31 +136,43 @@ public void add(T event) throws RejectedExecutionException { } /** - * Returns the next {{@link Event}} available. This method block indefinitely until - * one event is ready or the accumulator is closed. + * Returns the next {{@link Event}} available or null if no event is + * available. * - * @return The next event. + * @return The next event available or null. */ public T poll() { -return poll(Long.MAX_VALUE, TimeUnit.SECONDS); +lock.lock(); +try { +K key = randomKey(); +if (key == null) return null; + +Queue queue = queues.get(key); +T event = queue.poll(); + +if (queue.isEmpty()) queues.remove(key); +inflightKeys.add(key); +size--; + +return event; +} finally { +lock.unlock(); +} } /** - * Returns the next {{@link Event}} available. This method blocks for the provided - * time and returns null of not event is available. + * Returns the next {{@link Event}} available. This method blocks until an + * event is available or the thread is interrupted. Review Comment: I noticed we just catch interrupted exceptions. Is that correct? Or should we adjust the comment to say or the accumulator is closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] migrate StreamedJoinTest to Mockito [kafka]
agavra commented on code in PR #15424: URL: https://github.com/apache/kafka/pull/15424#discussion_r1503025048 ## streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/StreamJoinedTest.scala: ## @@ -21,24 +21,21 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder import org.apache.kafka.streams.scala.serialization.Serdes import org.apache.kafka.streams.scala.serialization.Serdes._ import org.apache.kafka.streams.state.Stores -import org.easymock.EasyMock -import org.easymock.EasyMock.{createMock, replay} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{BeforeEach, Test} +import org.mockito.Mockito import java.time.Duration class StreamJoinedTest { Review Comment: the equivalent with JUnit5 is: ``` @ExtendWith(Array(classOf[MockitoExtension])) @MockitoSettings(strictness = Strictness.STRICT_STUBS) ``` I've added that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16307) fix EventAccumulator thread idle ratio metric
[ https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-16307: --- Assignee: Jeff Kim > fix EventAccumulator thread idle ratio metric > - > > Key: KAFKA-16307 > URL: https://issues.apache.org/jira/browse/KAFKA-16307 > Project: Kafka > Issue Type: Sub-task >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Major > > The metric does not seem to be accurate, nor reporting metrics at every > interval. Requires investigation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16307) fix EventAccumulator thread idle ratio metric
[ https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Kim updated KAFKA-16307: - Description: The metric does not seem to be accurate, nor reporting metrics at every interval. Requires investigation (was: The metric does not seem to be accurate. Requires investigation) > fix EventAccumulator thread idle ratio metric > - > > Key: KAFKA-16307 > URL: https://issues.apache.org/jira/browse/KAFKA-16307 > Project: Kafka > Issue Type: Sub-task >Reporter: Jeff Kim >Priority: Major > > The metric does not seem to be accurate, nor reporting metrics at every > interval. Requires investigation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16307) fix EventAccumulator thread idle ratio metric
[ https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Kim updated KAFKA-16307: - Summary: fix EventAccumulator thread idle ratio metric (was: investigate EventAccumulator thread idle ratio metric) > fix EventAccumulator thread idle ratio metric > - > > Key: KAFKA-16307 > URL: https://issues.apache.org/jira/browse/KAFKA-16307 > Project: Kafka > Issue Type: Sub-task >Reporter: Jeff Kim >Priority: Major > > The metric does not seem to -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16307) investigate EventAccumulator thread idle ratio metric
[ https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Kim updated KAFKA-16307: - Description: The metric does not seem to > investigate EventAccumulator thread idle ratio metric > - > > Key: KAFKA-16307 > URL: https://issues.apache.org/jira/browse/KAFKA-16307 > Project: Kafka > Issue Type: Sub-task >Reporter: Jeff Kim >Priority: Major > > The metric does not seem to -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16307) investigate EventAccumulator thread idle ratio metric
Jeff Kim created KAFKA-16307: Summary: investigate EventAccumulator thread idle ratio metric Key: KAFKA-16307 URL: https://issues.apache.org/jira/browse/KAFKA-16307 Project: Kafka Issue Type: Sub-task Reporter: Jeff Kim -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16307) fix EventAccumulator thread idle ratio metric
[ https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Kim updated KAFKA-16307: - Description: The metric does not seem to be accurate. Requires investigation (was: The metric does not seem to ) > fix EventAccumulator thread idle ratio metric > - > > Key: KAFKA-16307 > URL: https://issues.apache.org/jira/browse/KAFKA-16307 > Project: Kafka > Issue Type: Sub-task >Reporter: Jeff Kim >Priority: Major > > The metric does not seem to be accurate. Requires investigation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16306) GroupCoordinatorService logger is not configured
[ https://issues.apache.org/jira/browse/KAFKA-16306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Liauchuk reassigned KAFKA-16306: -- Assignee: Jeff Kim (was: Anton Liauchuk) > GroupCoordinatorService logger is not configured > > > Key: KAFKA-16306 > URL: https://issues.apache.org/jira/browse/KAFKA-16306 > Project: Kafka > Issue Type: Sub-task >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Minor > > The GroupCoordinatorService constructor initializes with the wrong logger > class: > ``` > GroupCoordinatorService( > LogContext logContext, > GroupCoordinatorConfig config, > CoordinatorRuntime runtime, > GroupCoordinatorMetrics groupCoordinatorMetrics > ) { > this.log = logContext.logger(CoordinatorLoader.class); > this.config = config; > this.runtime = runtime; > this.groupCoordinatorMetrics = groupCoordinatorMetrics; > } > ``` > change this to GroupCoordinatorService.class -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16306: fix GroupCoordinatorService logger [kafka]
jeffkbkim opened a new pull request, #15433: URL: https://github.com/apache/kafka/pull/15433 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16306) GroupCoordinatorService logger is not configured
[ https://issues.apache.org/jira/browse/KAFKA-16306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-16306: --- Assignee: Jeff Kim > GroupCoordinatorService logger is not configured > > > Key: KAFKA-16306 > URL: https://issues.apache.org/jira/browse/KAFKA-16306 > Project: Kafka > Issue Type: Sub-task >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Minor > > The GroupCoordinatorService constructor initializes with the wrong logger > class: > ``` > GroupCoordinatorService( > LogContext logContext, > GroupCoordinatorConfig config, > CoordinatorRuntime runtime, > GroupCoordinatorMetrics groupCoordinatorMetrics > ) { > this.log = logContext.logger(CoordinatorLoader.class); > this.config = config; > this.runtime = runtime; > this.groupCoordinatorMetrics = groupCoordinatorMetrics; > } > ``` > change this to GroupCoordinatorService.class -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16306) GroupCoordinatorService logger is not configured
[ https://issues.apache.org/jira/browse/KAFKA-16306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Liauchuk reassigned KAFKA-16306: -- Assignee: Anton Liauchuk (was: Jeff Kim) > GroupCoordinatorService logger is not configured > > > Key: KAFKA-16306 > URL: https://issues.apache.org/jira/browse/KAFKA-16306 > Project: Kafka > Issue Type: Sub-task >Reporter: Jeff Kim >Assignee: Anton Liauchuk >Priority: Minor > > The GroupCoordinatorService constructor initializes with the wrong logger > class: > ``` > GroupCoordinatorService( > LogContext logContext, > GroupCoordinatorConfig config, > CoordinatorRuntime runtime, > GroupCoordinatorMetrics groupCoordinatorMetrics > ) { > this.log = logContext.logger(CoordinatorLoader.class); > this.config = config; > this.runtime = runtime; > this.groupCoordinatorMetrics = groupCoordinatorMetrics; > } > ``` > change this to GroupCoordinatorService.class -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16306) GroupCoordinatorService logger is not configured
Jeff Kim created KAFKA-16306: Summary: GroupCoordinatorService logger is not configured Key: KAFKA-16306 URL: https://issues.apache.org/jira/browse/KAFKA-16306 Project: Kafka Issue Type: Sub-task Reporter: Jeff Kim The GroupCoordinatorService constructor initializes with the wrong logger class: ``` GroupCoordinatorService( LogContext logContext, GroupCoordinatorConfig config, CoordinatorRuntime runtime, GroupCoordinatorMetrics groupCoordinatorMetrics ) { this.log = logContext.logger(CoordinatorLoader.class); this.config = config; this.runtime = runtime; this.groupCoordinatorMetrics = groupCoordinatorMetrics; } ``` change this to GroupCoordinatorService.class -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
[ https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820747#comment-17820747 ] Kirk True commented on KAFKA-16160: --- [~pnee]—do you have some insights on how [~phuctran] can reproduce this? Thanks! > AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop > -- > > Key: KAFKA-16160 > URL: https://issues.apache.org/jira/browse/KAFKA-16160 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Observing some excessive logging running AsyncKafkaConsumer and observing > excessive logging of : > {code:java} > 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, > groupId=concurrent_consumer] Node is not ready, handle the request in the > next event loop: node=worker4:9092 (id: 2147483644 rack: null), > request=UnsentRequest{requestBuil > der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', > memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, > rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], > serverAssignor=null, topicP > artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, > node=Optional[worker4:9092 (id: 2147483644 rack: null)] , > timer=org.apache.kafka.common.utils.Timer@55ed4733} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} > This seems to be triggered by a tight poll loop of the network thread. The > right thing to do is to backoff a bit for that given node and retry later. > This should be a blocker for 3.8 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16190) Member should send full heartbeat when rejoining
[ https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820746#comment-17820746 ] Kirk True commented on KAFKA-16190: --- [~lucasbru]—could you take a look at the pull request for this Jira? Thanks! > Member should send full heartbeat when rejoining > > > Key: KAFKA-16190 > URL: https://issues.apache.org/jira/browse/KAFKA-16190 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Quoc Phong Dang >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support, newbie > Fix For: 3.8.0 > > > The heartbeat request builder should make sure that all fields are sent in > the heartbeat request when the consumer rejoins (currently the > HeartbeatRequestManager request builder is reset on failure scenarios, which > should cover the fence+rejoin sequence). > Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses > this exact case given that it does explicitly change the subscription when it > gets fenced. We should ensure we test a consumer that keeps it same initial > subscription when it rejoins after being fenced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs [kafka]
dajac opened a new pull request, #15432: URL: https://github.com/apache/kafka/pull/15432 I have noticed the following log when a __consumer_offsets partition immigrate from a broker. It appends because the event is queued up after the event that unloads the state machine. This patch fixes it and fixes another similar one. ``` [2024-02-06 17:14:51,359] ERROR [GroupCoordinator id=1] Execution of UpdateImage(tp=__consumer_offsets-28, offset=13251) failed due to This is not the correct coordinator.. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime) org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator. ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on PR #15364: URL: https://github.com/apache/kafka/pull/15364#issuecomment-1964189888 @jeffkbkim Thanks for your comments. I have addressed all of them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1502633032 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2079,253 +2056,20 @@ public void testReconciliationProcess() { assertRecordsEquals(Collections.singletonList( RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) +.setState(MemberState.STABLE) .setMemberEpoch(11) .setPreviousMemberEpoch(11) -.setTargetMemberEpoch(11) .setAssignedPartitions(mkAssignment( mkTopicAssignment(fooTopicId, 4, 5), mkTopicAssignment(barTopicId, 1))) .build())), result.records() ); -assertEquals(ConsumerGroupMember.MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId3)); +assertEquals(MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId3)); assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, context.consumerGroupState(groupId)); } -@Test -public void testReconciliationRestartsWhenNewTargetAssignmentIsInstalled() { Review Comment: Correct. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } +/** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param memberThe member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ +private ConsumerGroupMember maybeReconcile( +String groupId, +ConsumerGroupMember member, +BiFunction currentPartitionEpoch, +int targetAssignmentEpoch, +Assignment targetAssignment, +List ownedTopicPartitions, +List records +) { +if (member.isReconciledTo(targetAssignmentEpoch)) { +return member; +} + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(targetAssignmentEpoch, targetAssignment) +.withCurrentPartitionEpoch(currentPartitionEpoch) +.withOwnedTopicPartitions(ownedTopicPartitions) +.build(); + +if (!updatedMember.equals(member)) { +records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + +log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", +groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), +formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + +if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { +scheduleConsumerGroupRebalanceTimeout( +groupId, +updatedMember.memberId(), +updatedMember.memberEpoch(), +updatedMember.rebalanceTimeoutMs() +); +} else { Review Comment: Yeah, that's right. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions( * @return A new ConsumerGroupMember or the current one. */ public ConsumerGroupMember
[jira] [Comment Edited] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh
[ https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820677#comment-17820677 ] Ahmed Sobeh edited comment on KAFKA-16282 at 2/26/24 1:23 PM: -- Thanks both [~showuon] @! I just finished writing up the [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1021%3A+Allow+to+get+last+stable+offset+%28LSO%29+in+kafka-get-offsets.sh] can you please take a look and let me know if you have any comments before I send it out to the mailing list? was (Author: JIRAUSER295920): Thanks both! I just finished writing up the [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1021%3A+Allow+to+get+last+stable+offset+%28LSO%29+in+kafka-get-offsets.sh] can you please take a look and let me know if you have any comments before I send it out to the mailing list? > Allow to get last stable offset (LSO) in kafka-get-offsets.sh > - > > Key: KAFKA-16282 > URL: https://issues.apache.org/jira/browse/KAFKA-16282 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Ahmed Sobeh >Priority: Major > Labels: need-kip, newbie, newbie++ > > Currently, when using `kafka-get-offsets.sh` to get the offset by time, we > have these choices: > {code:java} > --time / timestamp of the offsets before > that. > -1 or latest / [Note: No offset is returned, if > the > -2 or earliest / timestamp greater than recently > -3 or max-timestamp /committed record timestamp is > -4 or earliest-local / given.] (default: latest) > -5 or latest-tiered > {code} > For the "latest" option, it'll always return the "high watermark" because we > always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It > would be good if the command can support to get the last stable offset (LSO) > for transaction support. That is, sending the option with > *IsolationLevel.READ_COMMITTED* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh
[ https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820677#comment-17820677 ] Ahmed Sobeh edited comment on KAFKA-16282 at 2/26/24 1:23 PM: -- Thanks both [~showuon] [~jolshan] ! I just finished writing up the [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1021%3A+Allow+to+get+last+stable+offset+%28LSO%29+in+kafka-get-offsets.sh] can you please take a look and let me know if you have any comments before I send it out to the mailing list? was (Author: JIRAUSER295920): Thanks both [~showuon] @! I just finished writing up the [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1021%3A+Allow+to+get+last+stable+offset+%28LSO%29+in+kafka-get-offsets.sh] can you please take a look and let me know if you have any comments before I send it out to the mailing list? > Allow to get last stable offset (LSO) in kafka-get-offsets.sh > - > > Key: KAFKA-16282 > URL: https://issues.apache.org/jira/browse/KAFKA-16282 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Ahmed Sobeh >Priority: Major > Labels: need-kip, newbie, newbie++ > > Currently, when using `kafka-get-offsets.sh` to get the offset by time, we > have these choices: > {code:java} > --time / timestamp of the offsets before > that. > -1 or latest / [Note: No offset is returned, if > the > -2 or earliest / timestamp greater than recently > -3 or max-timestamp /committed record timestamp is > -4 or earliest-local / given.] (default: latest) > -5 or latest-tiered > {code} > For the "latest" option, it'll always return the "high watermark" because we > always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It > would be good if the command can support to get the last stable offset (LSO) > for transaction support. That is, sending the option with > *IsolationLevel.READ_COMMITTED* -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] migrate StreamedJoinTest to Mockito [kafka]
divijvaidya commented on code in PR #15424: URL: https://github.com/apache/kafka/pull/15424#discussion_r1502590907 ## streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/StreamJoinedTest.scala: ## @@ -21,24 +21,21 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder import org.apache.kafka.streams.scala.serialization.Serdes import org.apache.kafka.streams.scala.serialization.Serdes._ import org.apache.kafka.streams.state.Stores -import org.easymock.EasyMock -import org.easymock.EasyMock.{createMock, replay} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{BeforeEach, Test} +import org.mockito.Mockito import java.time.Duration class StreamJoinedTest { Review Comment: Please add the annotation `@RunWith(MockitoJUnitRunner.StrictStubs.class)` and resolve any errors that it may surface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16305) Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake
[ https://issues.apache.org/jira/browse/KAFKA-16305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Narula updated KAFKA-16305: -- Summary: Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake (was: # Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake) > Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake > -- > > Key: KAFKA-16305 > URL: https://issues.apache.org/jira/browse/KAFKA-16305 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1 >Reporter: Gaurav Narula >Assignee: Gaurav Narula >Priority: Major > > Kafka allows users to configure custom {{SSLEngine}} via the > {{SslEngineFactory}} interface. There have been attempts to use an OpenSSL > based {{SSLEngine}} using {{netty-handler}} over the JDK based implementation > for performance reasons. > While trying to use a Netty/Openssl based SSLEngine, we observe that the > server hangs while performing the TLS handshake. We observe the following > logs > {code} > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId > 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [INFO ] > Selector - [SocketServer listenerType=ZK_BROKER, nodeId=101] > XX-Gaurav: calling prepare channelId 127.0.0.1:60045-127.0.0.1:60046-0 > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId > 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId > 127.0.0.1:60045-127.0.0.1:60046-0 state HANDSHAKE > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake NEED_UNWRAP channelId > 127.0.0.1:60045-127.0.0.1:60046-0, appReadBuffer pos 0, netReadBuffer pos 0, > netWriteBuffer pos 0 > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake handshakeUnwrap channelId > 127.0.0.1:60045-127.0.0.1:60046-0 doRead true > 2024-02-26 01:40:00,118 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake post handshakeUnwrap: channelId > 127.0.0.1:60045-127.0.0.1:60046-0 handshakeStatus NEED_UNWRAP status > BUFFER_UNDERFLOW read 0 > 2024-02-26 01:40:00,118 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake post unwrap NEED_UNWRAP channelId > 127.0.0.1:60045-127.0.0.1:60046-0, handshakeResult Status = BUFFER_UNDERFLOW > HandshakeStatus = NEED_UNWRAP
[jira] [Created] (KAFKA-16305) # Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake
Gaurav Narula created KAFKA-16305: - Summary: # Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake Key: KAFKA-16305 URL: https://issues.apache.org/jira/browse/KAFKA-16305 Project: Kafka Issue Type: Bug Affects Versions: 3.6.1, 3.7.0 Reporter: Gaurav Narula Kafka allows users to configure custom {{SSLEngine}} via the {{SslEngineFactory}} interface. There have been attempts to use an OpenSSL based {{SSLEngine}} using {{netty-handler}} over the JDK based implementation for performance reasons. While trying to use a Netty/Openssl based SSLEngine, we observe that the server hangs while performing the TLS handshake. We observe the following logs {code} 2024-02-26 01:40:00,117 data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] SslTransportLayer- [SslTransportLayer channelId=127.0.0.1:60045-127.0.0.1:60046-0 key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED 2024-02-26 01:40:00,117 data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [INFO ] Selector - [SocketServer listenerType=ZK_BROKER, nodeId=101] XX-Gaurav: calling prepare channelId 127.0.0.1:60045-127.0.0.1:60046-0 2024-02-26 01:40:00,117 data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] SslTransportLayer- [SslTransportLayer channelId=127.0.0.1:60045-127.0.0.1:60046-0 key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED 2024-02-26 01:40:00,117 data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] SslTransportLayer- [SslTransportLayer channelId=127.0.0.1:60045-127.0.0.1:60046-0 key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 127.0.0.1:60045-127.0.0.1:60046-0 state HANDSHAKE 2024-02-26 01:40:00,117 data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] SslTransportLayer- [SslTransportLayer channelId=127.0.0.1:60045-127.0.0.1:60046-0 key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, interestOps=1, readyOps=0] SSLHandshake NEED_UNWRAP channelId 127.0.0.1:60045-127.0.0.1:60046-0, appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 0 2024-02-26 01:40:00,117 data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] SslTransportLayer- [SslTransportLayer channelId=127.0.0.1:60045-127.0.0.1:60046-0 key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, interestOps=1, readyOps=0] SSLHandshake handshakeUnwrap channelId 127.0.0.1:60045-127.0.0.1:60046-0 doRead true 2024-02-26 01:40:00,118 data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] SslTransportLayer- [SslTransportLayer channelId=127.0.0.1:60045-127.0.0.1:60046-0 key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, interestOps=1, readyOps=0] SSLHandshake post handshakeUnwrap: channelId 127.0.0.1:60045-127.0.0.1:60046-0 handshakeStatus NEED_UNWRAP status BUFFER_UNDERFLOW read 0 2024-02-26 01:40:00,118 data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] SslTransportLayer- [SslTransportLayer channelId=127.0.0.1:60045-127.0.0.1:60046-0 key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, interestOps=1, readyOps=0] SSLHandshake post unwrap NEED_UNWRAP channelId 127.0.0.1:60045-127.0.0.1:60046-0, handshakeResult Status = BUFFER_UNDERFLOW HandshakeStatus = NEED_UNWRAP bytesConsumed = 0 bytesProduced = 0, appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 0 handshakeStatus NEED_UNWRAP 2024-02-26 01:40:00,118 data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] SslTransportLayer- [SslTransportLayer channelId=127.0.0.1:60045-127.0.0.1:60046-0 key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId
[jira] [Assigned] (KAFKA-16305) # Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake
[ https://issues.apache.org/jira/browse/KAFKA-16305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Narula reassigned KAFKA-16305: - Assignee: Gaurav Narula > # Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake > > > Key: KAFKA-16305 > URL: https://issues.apache.org/jira/browse/KAFKA-16305 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1 >Reporter: Gaurav Narula >Assignee: Gaurav Narula >Priority: Major > > Kafka allows users to configure custom {{SSLEngine}} via the > {{SslEngineFactory}} interface. There have been attempts to use an OpenSSL > based {{SSLEngine}} using {{netty-handler}} over the JDK based implementation > for performance reasons. > While trying to use a Netty/Openssl based SSLEngine, we observe that the > server hangs while performing the TLS handshake. We observe the following > logs > {code} > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId > 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [INFO ] > Selector - [SocketServer listenerType=ZK_BROKER, nodeId=101] > XX-Gaurav: calling prepare channelId 127.0.0.1:60045-127.0.0.1:60046-0 > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId > 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId > 127.0.0.1:60045-127.0.0.1:60046-0 state HANDSHAKE > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake NEED_UNWRAP channelId > 127.0.0.1:60045-127.0.0.1:60046-0, appReadBuffer pos 0, netReadBuffer pos 0, > netWriteBuffer pos 0 > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake handshakeUnwrap channelId > 127.0.0.1:60045-127.0.0.1:60046-0 doRead true > 2024-02-26 01:40:00,118 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake post handshakeUnwrap: channelId > 127.0.0.1:60045-127.0.0.1:60046-0 handshakeStatus NEED_UNWRAP status > BUFFER_UNDERFLOW read 0 > 2024-02-26 01:40:00,118 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake post unwrap NEED_UNWRAP channelId > 127.0.0.1:60045-127.0.0.1:60046-0, handshakeResult Status = BUFFER_UNDERFLOW > HandshakeStatus = NEED_UNWRAP bytesConsumed = 0 bytesProduced = 0, > appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 0 > handshakeStatus
Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]
msn-tldr commented on code in PR #15385: URL: https://github.com/apache/kafka/pull/15385#discussion_r1502561721 ## clients/src/test/java/org/apache/kafka/clients/MetadataTest.java: ## @@ -1260,6 +1265,103 @@ else if (partition.equals(internalPart)) Mockito.reset(mockListener); } +/** + * Test that concurrently updating Metadata, and fetching the corresponding MetadataSnapshot & Cluster work as expected, i.e. + * snapshot & cluster contain the relevant updates. + */ +@Test +public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws InterruptedException { +Time time = new MockTime(); +metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()); + +// Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be retained in the update. Both will have leader-epoch 100. +int oldNodeCount = 10; +String topic1 = "test_topic1"; +String topic2 = "test_topic2"; +TopicPartition topic1Part0 = new TopicPartition(topic1, 0); +Map topicPartitionCounts = new HashMap<>(); +int oldPartitionCount = 1; +topicPartitionCounts.put(topic1, oldPartitionCount); +topicPartitionCounts.put(topic2, oldPartitionCount); +Map topicIds = new HashMap<>(); +topicIds.put(topic1, Uuid.randomUuid()); +topicIds.put(topic2, Uuid.randomUuid()); +int oldLeaderEpoch = 100; +MetadataResponse metadataResponse = +RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds); +metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); +MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot(); +Cluster cluster = metadata.fetch(); +// Validate metadata snapshot & cluster are setup as expected. +assertEquals(cluster, snapshot.cluster()); +assertEquals(oldNodeCount, snapshot.cluster().nodes().size()); +assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic1)); +assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic2)); +assertEquals(OptionalInt.of(oldLeaderEpoch), snapshot.leaderEpochFor(topic1Part0)); + +// Setup 6 threads, where 3 are updating metadata & 3 are reading snapshot/cluster. +// Metadata will be updated with higher # of nodes, partition-counts, leader-epoch. +int numThreads = 6; +ExecutorService service = Executors.newFixedThreadPool(numThreads); Review Comment: done at the end of the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster [kafka]
msn-tldr commented on code in PR #15385: URL: https://github.com/apache/kafka/pull/15385#discussion_r1502561721 ## clients/src/test/java/org/apache/kafka/clients/MetadataTest.java: ## @@ -1260,6 +1265,103 @@ else if (partition.equals(internalPart)) Mockito.reset(mockListener); } +/** + * Test that concurrently updating Metadata, and fetching the corresponding MetadataSnapshot & Cluster work as expected, i.e. + * snapshot & cluster contain the relevant updates. + */ +@Test +public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws InterruptedException { +Time time = new MockTime(); +metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()); + +// Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be retained in the update. Both will have leader-epoch 100. +int oldNodeCount = 10; +String topic1 = "test_topic1"; +String topic2 = "test_topic2"; +TopicPartition topic1Part0 = new TopicPartition(topic1, 0); +Map topicPartitionCounts = new HashMap<>(); +int oldPartitionCount = 1; +topicPartitionCounts.put(topic1, oldPartitionCount); +topicPartitionCounts.put(topic2, oldPartitionCount); +Map topicIds = new HashMap<>(); +topicIds.put(topic1, Uuid.randomUuid()); +topicIds.put(topic2, Uuid.randomUuid()); +int oldLeaderEpoch = 100; +MetadataResponse metadataResponse = +RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds); +metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); +MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot(); +Cluster cluster = metadata.fetch(); +// Validate metadata snapshot & cluster are setup as expected. +assertEquals(cluster, snapshot.cluster()); +assertEquals(oldNodeCount, snapshot.cluster().nodes().size()); +assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic1)); +assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic2)); +assertEquals(OptionalInt.of(oldLeaderEpoch), snapshot.leaderEpochFor(topic1Part0)); + +// Setup 6 threads, where 3 are updating metadata & 3 are reading snapshot/cluster. +// Metadata will be updated with higher # of nodes, partition-counts, leader-epoch. +int numThreads = 6; +ExecutorService service = Executors.newFixedThreadPool(numThreads); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16282) Allow to get last stable offset (LSO) in kafka-get-offsets.sh
[ https://issues.apache.org/jira/browse/KAFKA-16282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820677#comment-17820677 ] Ahmed Sobeh commented on KAFKA-16282: - Thanks both! I just finished writing up the [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1021%3A+Allow+to+get+last+stable+offset+%28LSO%29+in+kafka-get-offsets.sh] can you please take a look and let me know if you have any comments before I send it out to the mailing list? > Allow to get last stable offset (LSO) in kafka-get-offsets.sh > - > > Key: KAFKA-16282 > URL: https://issues.apache.org/jira/browse/KAFKA-16282 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Ahmed Sobeh >Priority: Major > Labels: need-kip, newbie, newbie++ > > Currently, when using `kafka-get-offsets.sh` to get the offset by time, we > have these choices: > {code:java} > --time / timestamp of the offsets before > that. > -1 or latest / [Note: No offset is returned, if > the > -2 or earliest / timestamp greater than recently > -3 or max-timestamp /committed record timestamp is > -4 or earliest-local / given.] (default: latest) > -5 or latest-tiered > {code} > For the "latest" option, it'll always return the "high watermark" because we > always send with the default option: *IsolationLevel.READ_UNCOMMITTED*. It > would be good if the command can support to get the last stable offset (LSO) > for transaction support. That is, sending the option with > *IsolationLevel.READ_COMMITTED* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Bump 3.7 branch's version to 3.7.1-SNAPSHOT [kafka]
stanislavkozlovski opened a new pull request, #15431: URL: https://github.com/apache/kafka/pull/15431 This patch updates the 3.7 release branch's version to 3.7.1-SNAPSHOT as per the wiki process -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15716: KRaft support in EpochDrivenReplicationProtocolAcceptanceTest [kafka]
mimaison commented on code in PR #15295: URL: https://github.com/apache/kafka/pull/15295#discussion_r1502397230 ## core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala: ## @@ -53,17 +58,26 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit val topic = "topic1" val msg = new Array[Byte](1000) val msgBigger = new Array[Byte](1) - var brokers: Seq[KafkaServer] = _ + var brokers: Seq[KafkaBroker] = _ var producer: KafkaProducer[Array[Byte], Array[Byte]] = _ var consumer: Consumer[Array[Byte], Array[Byte]] = _ + private def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT + private def listenerName: ListenerName = ListenerName.forSecurityProtocol(securityProtocol) + @BeforeEach override def setUp(testInfo: TestInfo): Unit = { +if (TestInfoUtils.isKRaft(testInfo) && metadataVersion.isLessThan(IBP_3_3_IV0)) { Review Comment: Can you explain why we need this change? `metadataVersion` is hard coded to `MetadataVersion.latestTesting`, how can it be less than `IBP_3_3_IV0`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820663#comment-17820663 ] Lucas Brutschy commented on KAFKA-16008: Fixed by https://issues.apache.org/jira/browse/KAFKA-16258 > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy resolved KAFKA-16008. Resolution: Duplicate > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lucasbru merged PR #15415: URL: https://github.com/apache/kafka/pull/15415 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Optimize EventAccumulator [kafka]
dajac opened a new pull request, #15430: URL: https://github.com/apache/kafka/pull/15430 `poll(long timeout, TimeUnit unit)` is either used with `Long.MAX_VALUE` or `0`. This patch replaces it with `poll` and `take`. It removes the `awaitNanos` usage. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16066: Upgrade apacheds to 2.0.0.AM27 With apache kerby [kafka]
mimaison commented on code in PR #15277: URL: https://github.com/apache/kafka/pull/15277#discussion_r1502366164 ## core/src/test/scala/kafka/security/minikdc/MiniKdc.scala: ## @@ -19,38 +19,22 @@ package kafka.security.minikdc import java.io._ -import java.net.InetSocketAddress import java.nio.charset.StandardCharsets import java.nio.file.Files import java.text.MessageFormat import java.util.{Locale, Properties, UUID} - import kafka.utils.{CoreUtils, Exit, Logging} +import org.apache.directory.server.kerberos.shared.crypto.encryption.KerberosKeyFactory import scala.jdk.CollectionConverters._ -import org.apache.commons.lang.text.StrSubstitutor -import org.apache.directory.api.ldap.model.entry.{DefaultEntry, Entry} -import org.apache.directory.api.ldap.model.ldif.LdifReader -import org.apache.directory.api.ldap.model.name.Dn -import org.apache.directory.api.ldap.schema.extractor.impl.DefaultSchemaLdifExtractor -import org.apache.directory.api.ldap.schema.loader.LdifSchemaLoader -import org.apache.directory.api.ldap.schema.manager.impl.DefaultSchemaManager -import org.apache.directory.server.constants.ServerDNConstants -import org.apache.directory.server.core.DefaultDirectoryService -import org.apache.directory.server.core.api.{CacheService, DirectoryService, InstanceLayout} -import org.apache.directory.server.core.api.schema.SchemaPartition -import org.apache.directory.server.core.kerberos.KeyDerivationInterceptor -import org.apache.directory.server.core.partition.impl.btree.jdbm.{JdbmIndex, JdbmPartition} -import org.apache.directory.server.core.partition.ldif.LdifPartition -import org.apache.directory.server.kerberos.KerberosConfig -import org.apache.directory.server.kerberos.kdc.KdcServer -import org.apache.directory.server.kerberos.shared.crypto.encryption.KerberosKeyFactory -import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry} -import org.apache.directory.server.protocol.shared.transport.{TcpTransport, UdpTransport} -import org.apache.directory.server.xdbm.Index -import org.apache.directory.shared.kerberos.KerberosTime +import org.apache.kerby.kerberos.kerb.KrbException +import org.apache.kerby.kerberos.kerb.identity.backend.BackendConfig +import org.apache.kerby.kerberos.kerb.server.{KdcConfig, KdcConfigKey, SimpleKdcServer} import org.apache.kafka.common.utils.{Java, Utils} - +import org.apache.kerby.kerberos.kerb.`type`.KerberosTime +import org.apache.kerby.kerberos.kerb.`type`.base.{EncryptionKey, PrincipalName} +import org.apache.kerby.kerberos.kerb.keytab.{Keytab, KeytabEntry} +import org.apache.kerby.util.NetworkUtil /** * Mini KDC based on Apache Directory Server that can be embedded in tests or used from command line as a standalone Review Comment: I think this comment still needs to be addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException
[ https://issues.apache.org/jira/browse/KAFKA-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anatolii Popov updated KAFKA-16105: --- Labels: tiered-storage (was: ) > Reassignment of tiered topics is failing due to RemoteStorageException > -- > > Key: KAFKA-16105 > URL: https://issues.apache.org/jira/browse/KAFKA-16105 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Reporter: Anatolii Popov >Priority: Critical > Labels: tiered-storage > > When partition reassignment is happening for a tiered topic in most of the > cases it's stuck with RemoteStorageException's on follower nodes saying that > it can not construct remote log auxilary state: > > {code:java} > [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, > fetcherId=2] Error building remote log auxiliary state for test-24 > (kafka.server.ReplicaFetcherThread) > > org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't > build the state from remote store for partition: test-24, currentLeaderEpoch: > 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the > previous remote log segment metadata was not found > at > kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259) > at > kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106) > at > kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413) > at > scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331) > at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403) > at > scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129) > at > scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130) > {code} > > Scenario: > A cluster of 3 nodes with a single topic with 30 partitions. All partitions > have tiered segments. > Adding 3 more nodes to the cluster and making a reassignment to move all the > data to new nodes. > Behavior: > For most of the partitions reassignment is happening smoothly. > For some of the partitions when a new node starts to get assignments it reads > __remote_log_metadata topic and tries to initialize the metadata cache on > records with COPY_SEGMENT_STARTED. If it's reading such a message for the > partition before the partition was assigned to this specific node it ignores > the message, so skips the cache initialization and marks the partition as > assigned. So reassignment is stuck since COPY_SEGMENT_STARTED is never >
Re: [PR] KAFKA-14747: record discarded FK join subscription responses [kafka]
AyoubOm commented on PR #15395: URL: https://github.com/apache/kafka/pull/15395#issuecomment-1963731421 @mjsax Please have a look when you have some time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix ReadOnlySessionStore java docs [kafka]
AyoubOm commented on PR #15412: URL: https://github.com/apache/kafka/pull/15412#issuecomment-1963723331 @mimaison Please check this when you have some time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
dajac commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1502279590 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -20,83 +20,258 @@ import kafka.admin.ConsumerGroupCommand; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupType; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; -import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Properties; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class ListConsumerGroupTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListConsumerGroups(String quorum) throws Exception { +@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) +@MethodSource("getTestQuorumAndGroupProtocolParametersAll") +public void testListConsumerGroupsWithoutFilters(String quorum, String groupProtocol) throws Exception { String simpleGroup = "simple-group"; + +createOffsetsTopic(listenerName(), new Properties()); + addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); +addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); -scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup)); + +scala.collection.Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP)); final AtomicReference foundGroups = new AtomicReference<>(); + TestUtils.waitForCondition(() -> { foundGroups.set(service.listConsumerGroups().toSet()); return Objects.equals(expectedGroups, foundGroups.get()); }, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups.get() + "."); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) +@Test public void testListWithUnrecognizedNewConsumerOption() { String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--list"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testListConsumerGroupsWithStates() throws Exception { +@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) +@MethodSource("getTestQuorumAndGroupProtocolParametersAll") +public void testListConsumerGroupsWithStates(String quorum, String groupProtocol) throws Exception { String simpleGroup = "simple-group"; + +createOffsetsTopic(listenerName(), new Properties()); + addSimpleGroupExecutor(simpleGroup); -addConsumerGroupExecutor(1); +addConsumerGroupExecutor(1, groupProtocol); String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); -scala.collection.Set expectedListing = set(Arrays.asList( -new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), -new ConsumerGroupListing(GROUP, false, Optional.of(ConsumerGroupState.STABLE; +Set expectedListing = new HashSet<>(Arrays.asList( +new ConsumerGroupListing( +simpleGroup, +true, +Optional.of(ConsumerGroupState.EMPTY), +Optional.of(GroupType.CLASSIC) +), +new ConsumerGroupListing( +GROUP, +false, +Optional.of(ConsumerGroupState.STABLE), +Optional.of(GroupType.parse(groupProtocol)) +) +)); -final AtomicReference foundListing = new AtomicReference<>(); -TestUtils.waitForCondition(() -> { -