[PR] [KIP-890] Introducing the AbortableTransactionException [kafka]

2024-03-06 Thread via GitHub


sjhajharia opened a new pull request, #15486:
URL: https://github.com/apache/kafka/pull/15486

   As a part of KIP-890, we are introducing a new class of Exceptions which 
when encountered shall lead to Aborting the ongoing Transaction. The following 
PR introduces the same with client side handling and server side changes.
   On client Side, the code attempts to handle the exception as an Abortable 
error and ensure that it doesn't take the producer to a fatal state
   On the server side, we have bumped the ProduceRequest and ProduceResponse 
Version. And then added the necessary handling for the same maintaining the 
backward compatibility with clients.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]

2024-03-06 Thread via GitHub


showuon commented on PR #15481:
URL: https://github.com/apache/kafka/pull/15481#issuecomment-1982498719

   Also, please add some test cases for client and IP cases with sanitized 
strings. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]

2024-03-06 Thread via GitHub


showuon commented on code in PR #15481:
URL: https://github.com/apache/kafka/pull/15481#discussion_r1515554131


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -271,6 +273,9 @@ class ZkMigrationIntegrationTest {
   assertEquals(10, 
image.topics().getTopic("test-topic-3").partitions().size())
 
   val clientQuotas = image.clientQuotas().entities()
+  val errorUserEntity = new ClientQuotaEntity(Map("user" -> 
Sanitizer.sanitize(userName)).asJava)
+  assertEquals(false, clientQuotas.containsKey(errorUserEntity))

Review Comment:
   This should not be needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16100: Add timeout to all the CompletableApplicationEvents [kafka]

2024-03-06 Thread via GitHub


kirktrue commented on PR #15455:
URL: https://github.com/apache/kafka/pull/15455#issuecomment-1982356780

   @cadonna—failing tests seem unrelated. Is this OK to merge? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [4/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


nizhikov commented on PR #15465:
URL: https://github.com/apache/kafka/pull/15465#issuecomment-1982319460

   @chia7712 PR merged with the latest trunk and ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2024-03-06 Thread via GitHub


github-actions[bot] commented on PR #14951:
URL: https://github.com/apache/kafka/pull/14951#issuecomment-1982287149

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16350) StateUpdater does not init transaction after canceling task close action

2024-03-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16350:

Summary: StateUpdater does not init transaction after canceling task close 
action  (was: StateUpdated does not init transaction after canceling task close 
action)

> StateUpdater does not init transaction after canceling task close action
> 
>
> Key: KAFKA-16350
> URL: https://issues.apache.org/jira/browse/KAFKA-16350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16350) StateUpdated does not init transaction after canceling task close action

2024-03-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16350:

Attachment: 
tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
 processing threads true]-1-output.txt

> StateUpdated does not init transaction after canceling task close action
> 
>
> Key: KAFKA-16350
> URL: https://issues.apache.org/jira/browse/KAFKA-16350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16350) StateUpdated does not init transaction after canceling task close action

2024-03-06 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16350:
---

 Summary: StateUpdated does not init transaction after canceling 
task close action
 Key: KAFKA-16350
 URL: https://issues.apache.org/jira/browse/KAFKA-16350
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


With EOSv2, we use a thread producer shared across all tasks. We init tx on the 
producer with each _task_ (due to EOSv1 which uses a producer per task), and 
have a guard in place to only init tx a single time.

If we hit an error, we close the producer and create a new one, which is still 
not initialized for transaction. At the same time, with state updater, we 
schedule a "close task" action on error.

For each task we get back, we do cancel the "close task" action, to actually 
keep the task. If this happens for _all_ tasks, we don't have any task in state 
CRATED at hand, and thus we never init the producer for transactions, because 
we assume this was already done.

On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
{code:java}
Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
{code}
This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10892: Shared Readonly State Stores ( revisited ) [kafka]

2024-03-06 Thread via GitHub


wcarlson5 commented on code in PR #12742:
URL: https://github.com/apache/kafka/pull/12742#discussion_r1515301279


##
streams/src/test/java/org/apache/kafka/streams/processor/ReadOnlyStoreTest.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+public class ReadOnlyStoreTest {
+
+@Test
+public void shouldLoadDataIntoReadOnlyStoreAndAllowAccessFromProcessor() {

Review Comment:
   This kind of sounds like its testing the error path if the processor 
shouldn't have access to it tries to write/access that. Do we have a test for 
that? It seems that's what's special about a read only store



##
streams/src/test/java/org/apache/kafka/streams/processor/ReadOnlyStoreTest.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+public class ReadOnlyStoreTest {
+
+@Test
+public void shouldLoadDataIntoReadOnlyStoreAndAllowAccessFromProcessor() {
+final Topology topology = new Topology();
+topology.addReadOnlyStateStore(
+Stores.keyValueStoreBuilder(
+Stores.inMemoryKeyValueStore("readOnlyStore"),
+new Serdes.IntegerSerde(),
+new 

Re: [PR] MINOR: Add version 3.7 to the Kafka Streams system tests [kafka]

2024-03-06 Thread via GitHub


chia7712 closed pull request #15453: MINOR: Add version 3.7 to the Kafka 
Streams system tests
URL: https://github.com/apache/kafka/pull/15453


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add version 3.7 to the Kafka Streams system tests [kafka]

2024-03-06 Thread via GitHub


chia7712 commented on PR #15453:
URL: https://github.com/apache/kafka/pull/15453#issuecomment-1982060230

   close this PR as #15443 is merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [4/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


chia7712 commented on PR #15465:
URL: https://github.com/apache/kafka/pull/15465#issuecomment-1982055889

   @nizhikov I'm ready to review this PR, so please rebase it. thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


chia7712 commented on PR #15363:
URL: https://github.com/apache/kafka/pull/15363#issuecomment-1982054671

   @nizhikov thanks for this nice patch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


chia7712 merged PR #15363:
URL: https://github.com/apache/kafka/pull/15363


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


chia7712 commented on PR #15363:
URL: https://github.com/apache/kafka/pull/15363#issuecomment-1982051350

   the failed tests pass on my machine.
   ```sh
   ./gradlew cleanTest core:test --tests ReplicaManagerTest --tests 
LogDirFailureTest
   ```
   will merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1515290353


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e);
+return new HashMap<>(topicFutures);
 }
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+request.setCursor(requestCursor);
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+topicFutures.remove(topicName);
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if (requestCursor != null && 
requestCursor.topicName().equals(topicName)) {

Review Comment:
   Because the current implementation sends the topic list with the cursor, if 
the cursor topic is missing from the topic list, the request will fail with the 
invalid request error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1515289272


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e);
+return new HashMap<>(topicFutures);
 }
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+request.setCursor(requestCursor);
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+topicFutures.remove(topicName);
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if (requestCursor != null && 
requestCursor.topicName().equals(topicName)) {
+if (partiallyFinishedTopicDescription == null) {
+// The previous round cursor can point to the 
partition 0 of the next topic.
+partiallyFinishedTopicDescription = 
currentTopicDescription;
+} else {
+
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+}
+
+if (responseCursor == null 

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1515288246


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2177,122 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
+return call;
+}
+
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final 
Collection topicNames, DescribeTopicsOptions options) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+request.setCursor(requestCursor);
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+topicFutures.remove(topicName);

Review Comment:
   Yes, we don't need to remove it. 
   Also added a failure handling test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15964: fix flaky StreamsAssignmentScaleTest [kafka]

2024-03-06 Thread via GitHub


mjsax commented on code in PR #15485:
URL: https://github.com/apache/kafka/pull/15485#discussion_r1515287246


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -628,7 +628,7 @@ private boolean assignTasksToClients(final Cluster 
fullMetadata,
 
 log.info("{} client nodes and {} consumers participating in this 
rebalance: \n{}.",
  clientStates.size(),
- 
clientStates.values().stream().map(ClientState::capacity).reduce(Integer::sum),
+ 
clientStates.values().stream().map(ClientState::capacity).reduce(Integer::sum).orElse(0),

Review Comment:
   Minor fix: It's an `Optional` and the log currently say: 
   ```
   5 client nodes and Optional[10] consumers participating in this rebalance
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15964: fix flaky StreamsAssignmentScaleTest [kafka]

2024-03-06 Thread via GitHub


mjsax opened a new pull request, #15485:
URL: https://github.com/apache/kafka/pull/15485

   This PR bumps some timeouts due to slow Jenkins builds.
   
   +++
   I looked into logs, and it seems that some assignment take up to 90 seconds 
when they timeout. Also talked to @ableegoldman about the purpose of this test, 
and it's really about not hitting `max.poll.interval.ms` -- so with a timeout 
of 2 minutes instead of 1, we still have enough headroom.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15964) Flaky test: testHighAvailabilityTaskAssignorLargeNumConsumers – org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest

2024-03-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15964:
---

Assignee: Matthias J. Sax

> Flaky test: testHighAvailabilityTaskAssignorLargeNumConsumers – 
> org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest
> ---
>
> Key: KAFKA-15964
> URL: https://issues.apache.org/jira/browse/KAFKA-15964
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: flaky-test
>
> PR build: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14767/15/tests/]
>  
> {code:java}
> java.lang.AssertionError: The first assignment took too long to complete at 
> 94250ms.Stacktracejava.lang.AssertionError: The first assignment took too 
> long to complete at 94250ms.at 
> org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest.completeLargeAssignment(StreamsAssignmentScaleTest.java:220)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest.testHighAvailabilityTaskAssignorLargeNumConsumers(StreamsAssignmentScaleTest.java:85)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568)   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16349) ShutdownableThread fails build by calling Exit with race condition

2024-03-06 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824207#comment-17824207
 ] 

Ismael Juma commented on KAFKA-16349:
-

Good catch.

> ShutdownableThread fails build by calling Exit with race condition
> --
>
> Key: KAFKA-16349
> URL: https://issues.apache.org/jira/browse/KAFKA-16349
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Priority: Minor
>
> `ShutdownableThread` calls `Exit.exit()` when the thread's operation throws 
> FatalExitError. In normal operation, this calls System.exit, and exits the 
> process. In tests, the exit procedure is masked with Exit.setExitProcedure to 
> prevent tests that encounter a FatalExitError from crashing the test JVM.
> Masking of exit procedures is usually done in BeforeEach/AfterEach 
> annotations, with the exit procedures cleaned up immediately after the test 
> finishes. If the body of the test creates a ShutdownableThread that outlives 
> the test, such as by omitting `ShutdownableThread#awaitShutdown`, by having 
> `ShutdownableThread#awaitShutdown` be interrupted by a test timeout, or by a 
> race condition between `Exit.resetExitProcedure` and `Exit.exit`, then 
> System.exit() can be called erroneously.
>  
> {noformat}
> // First, in the test thread:
> Exit.setExitProcedure(...)
> try {
> new ShutdownableThread(...).start()
> } finally {
> Exit.resetExitProcedure()
> }
> // Second, in the ShutdownableThread:
> try {
> throw new FatalExitError(...)
> } catch (FatalExitError e) {
> Exit.exit(...) // Calls real System.exit()
> }{noformat}
>  
> This can be resolved by one of the following:
>  # Eliminate FatalExitError usages in code when setExitProcedure is in-use
>  # Eliminate the Exit.exit call from ShutdownableThread, and instead 
> propagate this error to another thread to handle without a race-condition
>  # Eliminate resetExitProcedure by refactoring Exit to be non-static
> FatalExitError is in use in a small number of places, but may be difficult to 
> eliminate:
>  * FinalizedFeatureChangeListener
>  * InterBrokerSendThread
>  * TopicBasedRemoteLogMetadataManager
> It appears that every other use of Exit.setExitProcedure/Exit.exit is done on 
> a single thread, so ShutdownableThread is the only place where this race 
> condition is present.
> The effect of this bug is that the build is flaky, as race 
> conditions/timeouts in tests can cause the gradle executors to exit with 
> status code 1, which has happened 26 times in the last 28 days. I have not 
> yet been able to confirm this bug is happening in other tests, but I do have 
> a deterministic reproduction case with the exact same symptoms:
> {noformat}
> Gradle Test Run :core:test > Gradle Test Executor 38 > ShutdownableThreadTest 
> > testShutdownWhenTestTimesOut(boolean) > 
> "testShutdownWhenTestTimesOut(boolean).isInterruptible=true" SKIPPED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':core:test'.
> > Process 'Gradle Test Executor 38' finished with non-zero exit value 1
>   This problem might be caused by incorrect test process configuration.
>   For more on test execution, please refer to 
> https://docs.gradle.org/8.6/userguide/java_testing.html#sec:test_execution in 
> the Gradle documentation.{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16349: Reproduce ShutdownableThread Exit race condition [kafka]

2024-03-06 Thread via GitHub


gharris1727 opened a new pull request, #15484:
URL: https://github.com/apache/kafka/pull/15484

   This test causes the build to fail with the current ShutdownableThread 
implementation that calls Exit.
   This is because Exit is inherently thread-unsafe, and ShutdownableThread may 
live longer than the exit procedures are masked.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16349) ShutdownableThread fails build by calling Exit with race condition

2024-03-06 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16349:
---

 Summary: ShutdownableThread fails build by calling Exit with race 
condition
 Key: KAFKA-16349
 URL: https://issues.apache.org/jira/browse/KAFKA-16349
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.8.0
Reporter: Greg Harris


`ShutdownableThread` calls `Exit.exit()` when the thread's operation throws 
FatalExitError. In normal operation, this calls System.exit, and exits the 
process. In tests, the exit procedure is masked with Exit.setExitProcedure to 
prevent tests that encounter a FatalExitError from crashing the test JVM.

Masking of exit procedures is usually done in BeforeEach/AfterEach annotations, 
with the exit procedures cleaned up immediately after the test finishes. If the 
body of the test creates a ShutdownableThread that outlives the test, such as 
by omitting `ShutdownableThread#awaitShutdown`, by having 
`ShutdownableThread#awaitShutdown` be interrupted by a test timeout, or by a 
race condition between `Exit.resetExitProcedure` and `Exit.exit`, then 
System.exit() can be called erroneously.

 
{noformat}
// First, in the test thread:
Exit.setExitProcedure(...)
try {
new ShutdownableThread(...).start()
} finally {
Exit.resetExitProcedure()
}
// Second, in the ShutdownableThread:
try {
throw new FatalExitError(...)
} catch (FatalExitError e) {
Exit.exit(...) // Calls real System.exit()
}{noformat}
 

This can be resolved by one of the following:
 # Eliminate FatalExitError usages in code when setExitProcedure is in-use
 # Eliminate the Exit.exit call from ShutdownableThread, and instead propagate 
this error to another thread to handle without a race-condition
 # Eliminate resetExitProcedure by refactoring Exit to be non-static

FatalExitError is in use in a small number of places, but may be difficult to 
eliminate:
 * FinalizedFeatureChangeListener
 * InterBrokerSendThread
 * TopicBasedRemoteLogMetadataManager

It appears that every other use of Exit.setExitProcedure/Exit.exit is done on a 
single thread, so ShutdownableThread is the only place where this race 
condition is present.

The effect of this bug is that the build is flaky, as race conditions/timeouts 
in tests can cause the gradle executors to exit with status code 1, which has 
happened 26 times in the last 28 days. I have not yet been able to confirm this 
bug is happening in other tests, but I do have a deterministic reproduction 
case with the exact same symptoms:
{noformat}
Gradle Test Run :core:test > Gradle Test Executor 38 > ShutdownableThreadTest > 
testShutdownWhenTestTimesOut(boolean) > 
"testShutdownWhenTestTimesOut(boolean).isInterruptible=true" SKIPPED
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for task ':core:test'.
> Process 'Gradle Test Executor 38' finished with non-zero exit value 1
  This problem might be caused by incorrect test process configuration.
  For more on test execution, please refer to 
https://docs.gradle.org/8.6/userguide/java_testing.html#sec:test_execution in 
the Gradle documentation.{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16335: Remove deprecated method of StreamPartitioner [kafka]

2024-03-06 Thread via GitHub


caioguedes commented on PR #15482:
URL: https://github.com/apache/kafka/pull/15482#issuecomment-1981952353

   Thanks @mjsax, I will fix the problems in the CI and let it open.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


artemlivshits commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513820234


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2177,122 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
+return call;
+}
+
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final 
Collection topicNames, DescribeTopicsOptions options) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+request.setCursor(requestCursor);
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+topicFutures.remove(topicName);

Review Comment:
   This will be executed concurrently with the thread that called 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi, so we may 
concurrently modify topic futures while the HashMap constructor at line 2313 is 
iterating over it and it's not a thread-safe collection.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+
+if (topicNamesList.isEmpty()) {
+return new 

Re: [PR] KAFKA-10892: Shared Readonly State Stores ( revisited ) [kafka]

2024-03-06 Thread via GitHub


mjsax commented on PR #12742:
URL: https://github.com/apache/kafka/pull/12742#issuecomment-1981929378

   @calmera -- As discussed in person (a long time back...) I took the liberty 
to update this PR. Hope we can get this merged soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-06 Thread via GitHub


mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1515212230


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java:
##
@@ -25,36 +25,46 @@
 
 public final class StreamStreamJoinUtil {
 
-private StreamStreamJoinUtil(){
+private StreamStreamJoinUtil() {
 }
 
 public static  boolean skipRecord(
 final Record record, final Logger logger,
 final Sensor droppedRecordsSensor,
-final ProcessorContext context) {
+final ProcessorContext context
+) {
 // we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
 //
 // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
 // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
 if (record.key() == null || record.value() == null) {
-if (context.recordMetadata().isPresent()) {
-final RecordMetadata recordMetadata = 
context.recordMetadata().get();
-logger.warn(
-"Skipping record due to null key or value. "
-+ "topic=[{}] partition=[{}] offset=[{}]",
-recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
-);
-} else {
-logger.warn(
-"Skipping record due to null key or value. Topic, 
partition, and offset not known."
-);
-}
-droppedRecordsSensor.record();
+logSkip("null key or value", logger, droppedRecordsSensor, 
context);
 return true;
 } else {
 return false;
 }
 }
+
+public static  void logSkip(
+final String reason,
+final Logger logger,
+final Sensor droppedRecordsSensor,
+final ProcessorContext context
+) {
+if (context.recordMetadata().isPresent()) {
+final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+logger.warn(
+"Skipping record. reason=[{}] topic=[{}] partition=[{}] 
offset=[{}]",

Review Comment:
   Nit: Just comparing to 
`AbstractKStreamTimeWindowAggregateProcessor#logSkippedRecordForExpiredWindow` 
is seems we could addd more information -- should we try to merge both "skip 
reason" as your PR proposes, if have two different output, one key null-case, 
and a different one for "expired" case similar to windowed-aggregation?



##
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##
@@ -71,11 +71,21 @@ void afterEach() {
 @Test
 void testRelaxedLeftStreamStreamJoin() {
 leftStream
-.leftJoin(rightStream, JOINER, WINDOW)
+.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
 .to(OUT);
 initTopology();
-left.pipeInput(null, "leftValue", 1);
-assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+left.pipeInput(null, "leftValue1", 1);
+left.pipeInput(null, "leftValue2", 90);
+left.pipeInput(null, "lateArrival-Dropped", 19);
+left.pipeInput(null, "lateArrivalWithinGrace", 20);
+assertEquals(
+Arrays.asList(
+new KeyValue<>(null, "leftValue1|null"),
+new KeyValue<>(null, "leftValue2|null"),
+new KeyValue<>(null, "lateArrivalWithinGrace|null")

Review Comment:
   Why is this record already in the output? We should not drop it, but it 
seems we cannot emit it right away either, because we need to wait until the 
window closes, so would need to pipe one more record with ts=91 to flush out 
this result?



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -1901,6 +1903,66 @@ public void testAsymmetricWindowingBefore() {
 }
 }
 
+@Test
+public void recordsArrivingPostWindowCloseShouldBeDropped() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream joined = builder.stream(topic1, 
consumed).join(
+builder.stream(topic2, consumed),
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+joined.process(supplier);
+
+
+try (final TopologyTestDriver driver = new 

Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-06 Thread via GitHub


florin-akermann commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-1981795443

   @mjsax thanks, it is rebased.
   
   The tests didn't need any adjustments due to the idea quoted below.
   > I realized that if we only want to assert that late records get dropped 
and not look at the join result then we could even reuse the same test case for 
all three involved operators (inner, left, outer) as shown in 
`KStreamKStreamWindowCloseTest`.
   
   ...
   
   > If you agree then I would remove the 
`.recordsArrivingPostWindowCloseShouldBeDropped()` from 
`KStreamKStreamJoinTest, KStreamKStreamLeftJoinTest and 
KStreamKStreamOuterJoinTest`.
   
   I have removed the extensions to KStreamKStreamJoinTest, 
KStreamKStreamLeftJoinTest and KStreamKStreamOuterJoinTest again.
   
   Do you agree with this approach? 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add 3.7 to Kafka Streams system tests [kafka]

2024-03-06 Thread via GitHub


mjsax merged PR #15443:
URL: https://github.com/apache/kafka/pull/15443


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1515057898


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception {
 }
 }
 
+@SuppressWarnings("NPathComplexity")

Review Comment:
   Updated the server-side code and added more tests with invalid cursor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1515057124


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+// This is used in the describe topics path if using DescribeTopics API.
+private Node replicaToFakeNode(int id) {
+return new Node(id, "Dummy", 0);
+}

Review Comment:
   Now it will query the cluster info for the nodes.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2399,11 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+// This is used in the describe topics path if using 
DescribeTopicPartitions API.
+private Node replicaToFakeNode(int id) {
+return new Node(id, "Dummy", 0);

Review Comment:
   Good point. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16343) Improve tests of streams foreignkeyjoin package

2024-03-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16343:

Component/s: unit tests

> Improve tests of streams foreignkeyjoin package
> ---
>
> Key: KAFKA-16343
> URL: https://issues.apache.org/jira/browse/KAFKA-16343
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> Some classes are not tested in streams foreignkeyjoin package, such as 
> SubscriptionSendProcessorSupplier and ForeignTableJoinProcessorSupplier. 
> Corresponding tests should be added.
> The class ForeignTableJoinProcessorSupplierTest should be renamed as it is 
> not testing ForeignTableJoinProcessor, but rather 
> SubscriptionJoinProcessorSupplier.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16335: Remove deprecated method of StreamPartitioner [kafka]

2024-03-06 Thread via GitHub


mjsax commented on PR #15482:
URL: https://github.com/apache/kafka/pull/15482#issuecomment-1981581044

   Next release is 3.8, so we cannot merge this yet. We can of course leave 
this PR open for the time being, and I can review it when we start 4.0 release 
(which is planned after 3.8).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16335) Remove Deprecated method on StreamPartitioner

2024-03-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824137#comment-17824137
 ] 

Matthias J. Sax commented on KAFKA-16335:
-

Thanks for your interest to work on this ticket. Note that the next release 
will be 3.8, and thus we cannot yet complete this ticket right now.

> Remove Deprecated method on StreamPartitioner
> -
>
> Key: KAFKA-16335
> URL: https://issues.apache.org/jira/browse/KAFKA-16335
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Caio César
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Deprecated in 3.4 release via 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356]
>  * StreamPartitioner#partition (singular)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-06 Thread via GitHub


wernerdv commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1514997085


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -322,6 +324,20 @@ private void invalidOffset(String offset) {
 "'earliest', 'latest', or a non-negative long.");
 }
 
+private long parseTimeoutMs() {
+long timeout;
+if (options.has(timeoutMsOpt)) {
+timeout = options.valueOf(timeoutMsOpt);
+if (timeout < 0) {
+CommandLineUtils.printUsageAndExit(parser, "The provided 
timeout-ms value '" + timeout +

Review Comment:
   Returned to the previous logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]

2024-03-06 Thread via GitHub


gharris1727 merged PR #15316:
URL: https://github.com/apache/kafka/pull/15316


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]

2024-03-06 Thread via GitHub


gharris1727 commented on PR #15316:
URL: https://github.com/apache/kafka/pull/15316#issuecomment-1981533086

   Test failures appear unrelated, and the runtime tests pass locally for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15797: Fix flaky EOS_v2 upgrade test [kafka]

2024-03-06 Thread via GitHub


mjsax merged PR #15449:
URL: https://github.com/apache/kafka/pull/15449


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16288) Values.convertToDecimal throws ClassCastExceptions on String inputs

2024-03-06 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-16288.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Values.convertToDecimal throws ClassCastExceptions on String inputs
> ---
>
> Key: KAFKA-16288
> URL: https://issues.apache.org/jira/browse/KAFKA-16288
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 1.1.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 3.8.0
>
>
> The convertToDecimal function does a best-effort conversion of an arbitrary 
> Object to a BigDecimal. Generally when a conversion cannot take place (such 
> as when an unknown subclass is passed-in) the function throws a 
> DataException. However, specifically for String inputs with valid number 
> within, a ClassCastException is thrown.
> This is because there is an extra "doubleValue" call in the implementation: 
> [https://github.com/apache/kafka/blob/ead2431c37ace9255df88ffe819bb905311af088/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L427]
>  which immediately causes a ClassCastException in the caller: 
> [https://github.com/apache/kafka/blob/ead2431c37ace9255df88ffe819bb905311af088/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L305]
>  
> This appears accidental, because the case for String is explicitly handled, 
> it just behaves poorly. Instead of the ClassCastException, the number should 
> be parsed correctly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-06 Thread via GitHub


mimaison commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1514959532


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -322,6 +324,20 @@ private void invalidOffset(String offset) {
 "'earliest', 'latest', or a non-negative long.");
 }
 
+private long parseTimeoutMs() {
+long timeout;
+if (options.has(timeoutMsOpt)) {
+timeout = options.valueOf(timeoutMsOpt);
+if (timeout < 0) {
+CommandLineUtils.printUsageAndExit(parser, "The provided 
timeout-ms value '" + timeout +

Review Comment:
   It seems previously we wouldn't fail if a negative value was provided.



##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -385,10 +404,14 @@ String bootstrapServer() {
 return options.valueOf(bootstrapServerOpt);
 }
 
-String includedTopicsArg() {
-return options.has(includeOpt)
-? options.valueOf(includeOpt)
-: options.valueOf(whitelistOpt);
+Optional includedTopicsArg() {
+if (options.has(includeOpt)) {
+return Optional.of(options.valueOf(includeOpt));
+} else if (options.has(whitelistOpt)) {
+return Optional.of(options.valueOf(whitelistOpt));
+} else {
+return Optional.empty();
+}

Review Comment:
   Could this be simplified into:
   ```
   return options.has(includeOpt)
   ? Optional.of(options.valueOf(includeOpt))
   : Optional.ofNullable(options.valueOf(whitelistOpt));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-06 Thread via GitHub


wernerdv commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1514949575


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
 }
 
 public static class ConsumerWrapper {
-final Optional topic;
-final OptionalInt partitionId;
-final OptionalLong offset;
-final Optional includedTopics;
-final Consumer consumer;
-final long timeoutMs;
 final Time time = Time.SYSTEM;
+final long timeoutMs;
+final Consumer consumer;
 
 Iterator> recordIter = 
Collections.emptyIterator();
 
-public ConsumerWrapper(Optional topic,
-   OptionalInt partitionId,
-   OptionalLong offset,
-   Optional includedTopics,
-   Consumer consumer,
-   long timeoutMs) {
-this.topic = topic;
-this.partitionId = partitionId;
-this.offset = offset;
-this.includedTopics = includedTopics;
+public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) {
+Optional topic = Optional.ofNullable(opts.topicArg());
+Optional includedTopics = 
Optional.ofNullable(opts.includedTopicsArg());
 this.consumer = consumer;
-this.timeoutMs = timeoutMs;
-
-if (topic.isPresent() && partitionId.isPresent() && 
offset.isPresent() && !includedTopics.isPresent()) {
-seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
-} else if (topic.isPresent() && partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-// default to latest if no offset is provided
-seek(topic.get(), partitionId.getAsInt(), 
ListOffsetsRequest.LATEST_TIMESTAMP);
-} else if (topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-consumer.subscribe(Collections.singletonList(topic.get()));
-} else if (!topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && includedTopics.isPresent()) {
-consumer.subscribe(Pattern.compile(includedTopics.get()));
+timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : 
Long.MAX_VALUE;

Review Comment:
   Done.



##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
 }
 
 public static class ConsumerWrapper {
-final Optional topic;
-final OptionalInt partitionId;
-final OptionalLong offset;
-final Optional includedTopics;
-final Consumer consumer;
-final long timeoutMs;
 final Time time = Time.SYSTEM;
+final long timeoutMs;
+final Consumer consumer;
 
 Iterator> recordIter = 
Collections.emptyIterator();
 
-public ConsumerWrapper(Optional topic,
-   OptionalInt partitionId,
-   OptionalLong offset,
-   Optional includedTopics,
-   Consumer consumer,
-   long timeoutMs) {
-this.topic = topic;
-this.partitionId = partitionId;
-this.offset = offset;
-this.includedTopics = includedTopics;
+public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) {
+Optional topic = Optional.ofNullable(opts.topicArg());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add read/write all operation [kafka]

2024-03-06 Thread via GitHub


chia7712 commented on code in PR #15462:
URL: https://github.com/apache/kafka/pull/15462#discussion_r1514944319


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -498,29 +497,17 @@ public CompletableFuture 
listGroups(
 );
 }
 
-final Set existingPartitionSet = runtime.partitions();
-
-if (existingPartitionSet.isEmpty()) {
-return CompletableFuture.completedFuture(new 
ListGroupsResponseData());
-}
-
-final 
List>> futures =
-new ArrayList<>();
-
-for (TopicPartition tp : existingPartitionSet) {
-futures.add(runtime.scheduleReadOperation(
-"list-groups",
-tp,
-(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), request.typesFilter(), 
lastCommittedOffset)
-).exceptionally(exception -> {
-exception = Errors.maybeUnwrapException(exception);
-if (exception instanceof NotCoordinatorException) {
-return Collections.emptyList();
-} else {
-throw new CompletionException(exception);
-}
-}));
-}
+final 
List>> futures = 
runtime.scheduleReadAllOperation(
+"list-groups",
+(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), request.typesFilter(), 
lastCommittedOffset)
+).stream().map(future -> future.exceptionally(exception -> {

Review Comment:
   > adding an helper like mapExceptionally to add an exception handler on a 
list of futures
   
   Do you mean to enhance `FutureUtils#combineFutures` to take one more 
parameter? Current PR is good to me but I'd like to see more design  
(`mapExceptionally`) before merging :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16319: Divide DeleteTopics requests by leader node [kafka]

2024-03-06 Thread via GitHub


danielgospodinow commented on code in PR #15479:
URL: https://github.com/apache/kafka/pull/15479#discussion_r1514907422


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java:
##
@@ -79,15 +79,15 @@ public static SimpleAdminApiFuture newFuture(
 @Override
 public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
 Map 
deletionsForTopic = new HashMap<>();
-for (Map.Entry entry: 
recordsToDelete.entrySet()) {
-TopicPartition topicPartition = entry.getKey();
+for (TopicPartition topicPartition : keys) {

Review Comment:
   Oooh, that makes complete sense now! Thanks a lot for the explanation, 
Andrew!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add read/write all operation [kafka]

2024-03-06 Thread via GitHub


dajac commented on code in PR #15462:
URL: https://github.com/apache/kafka/pull/15462#discussion_r1514893077


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -498,29 +497,17 @@ public CompletableFuture 
listGroups(
 );
 }
 
-final Set existingPartitionSet = runtime.partitions();
-
-if (existingPartitionSet.isEmpty()) {
-return CompletableFuture.completedFuture(new 
ListGroupsResponseData());
-}
-
-final 
List>> futures =
-new ArrayList<>();
-
-for (TopicPartition tp : existingPartitionSet) {
-futures.add(runtime.scheduleReadOperation(
-"list-groups",
-tp,
-(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), request.typesFilter(), 
lastCommittedOffset)
-).exceptionally(exception -> {
-exception = Errors.maybeUnwrapException(exception);
-if (exception instanceof NotCoordinatorException) {
-return Collections.emptyList();
-} else {
-throw new CompletionException(exception);
-}
-}));
-}
+final 
List>> futures = 
runtime.scheduleReadAllOperation(
+"list-groups",
+(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), request.typesFilter(), 
lastCommittedOffset)
+).stream().map(future -> future.exceptionally(exception -> {

Review Comment:
   I actually considered this but eventually rejected it because those new 
methods could also be used with a different handling than `exceptionally`. The 
separation of concerns would not be too good in my opinion. I was also 
considering adding an helper like `mapExceptionally` to add an exception 
handler on a list of futures. Do you think that it could also help?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16319: Divide DeleteTopics requests by leader node [kafka]

2024-03-06 Thread via GitHub


AndrewJSchofield commented on code in PR #15479:
URL: https://github.com/apache/kafka/pull/15479#discussion_r1514890396


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java:
##
@@ -79,15 +79,15 @@ public static SimpleAdminApiFuture newFuture(
 @Override
 public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
 Map 
deletionsForTopic = new HashMap<>();
-for (Map.Entry entry: 
recordsToDelete.entrySet()) {
-TopicPartition topicPartition = entry.getKey();
+for (TopicPartition topicPartition : keys) {

Review Comment:
   The PartitionLeaderStrategy builds a map from broker ID to set of 
topic-partition. So, it's essentially divided up the full set of 
topic-partitions for which records are being deleted. Each call to 
`buildBatchedRequest` is supposed to build a broker-specific list of 
DeleteRecordsPartitions. But actually, it was using the complete list for the 
entire multi-leader operation (which is in `this.recordsToDelete`). So, every 
broker got every topic-partition, even ones that it didn't lead. By using the 
set of topic-partitions which is in the `keys` argument, it's using the 
relevant subset of the topic-partitions for the broker in question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]

2024-03-06 Thread via GitHub


clolov commented on PR #15261:
URL: https://github.com/apache/kafka/pull/15261#issuecomment-1981388046

   Heya @cadonna, apologies for the delay! Hopefully the latest version 
addresses your comments , but if not let me know and I will pick it up tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16319: Divide DeleteTopics requests by leader node [kafka]

2024-03-06 Thread via GitHub


danielgospodinow commented on code in PR #15479:
URL: https://github.com/apache/kafka/pull/15479#discussion_r1514870475


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java:
##
@@ -79,15 +79,15 @@ public static SimpleAdminApiFuture newFuture(
 @Override
 public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
 Map 
deletionsForTopic = new HashMap<>();
-for (Map.Entry entry: 
recordsToDelete.entrySet()) {
-TopicPartition topicPartition = entry.getKey();
+for (TopicPartition topicPartition : keys) {

Review Comment:
   I'm having trouble understanding how changing the iteration over 
`recordsToDelete` to `keys` resolves the issue. May I ask for a quick 
explanation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


nizhikov commented on PR #15363:
URL: https://github.com/apache/kafka/pull/15363#issuecomment-1981384111

   @nizhikov Thanks for the review.
   All your comments fixed.
   Please, take a look one more time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-06 Thread via GitHub


mimaison commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1514821883


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
 }
 
 public static class ConsumerWrapper {
-final Optional topic;
-final OptionalInt partitionId;
-final OptionalLong offset;
-final Optional includedTopics;
-final Consumer consumer;
-final long timeoutMs;
 final Time time = Time.SYSTEM;
+final long timeoutMs;
+final Consumer consumer;
 
 Iterator> recordIter = 
Collections.emptyIterator();
 
-public ConsumerWrapper(Optional topic,
-   OptionalInt partitionId,
-   OptionalLong offset,
-   Optional includedTopics,
-   Consumer consumer,
-   long timeoutMs) {
-this.topic = topic;
-this.partitionId = partitionId;
-this.offset = offset;
-this.includedTopics = includedTopics;
+public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) {
+Optional topic = Optional.ofNullable(opts.topicArg());
+Optional includedTopics = 
Optional.ofNullable(opts.includedTopicsArg());
 this.consumer = consumer;
-this.timeoutMs = timeoutMs;
-
-if (topic.isPresent() && partitionId.isPresent() && 
offset.isPresent() && !includedTopics.isPresent()) {
-seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
-} else if (topic.isPresent() && partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-// default to latest if no offset is provided
-seek(topic.get(), partitionId.getAsInt(), 
ListOffsetsRequest.LATEST_TIMESTAMP);
-} else if (topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-consumer.subscribe(Collections.singletonList(topic.get()));
-} else if (!topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && includedTopics.isPresent()) {
-consumer.subscribe(Pattern.compile(includedTopics.get()));
+timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : 
Long.MAX_VALUE;

Review Comment:
   Could we merge this with the `timeoutMs()` method from 
`ConsoleConsumerOptions`?



##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
 }
 
 public static class ConsumerWrapper {
-final Optional topic;
-final OptionalInt partitionId;
-final OptionalLong offset;
-final Optional includedTopics;
-final Consumer consumer;
-final long timeoutMs;
 final Time time = Time.SYSTEM;
+final long timeoutMs;
+final Consumer consumer;
 
 Iterator> recordIter = 
Collections.emptyIterator();
 
-public ConsumerWrapper(Optional topic,
-   OptionalInt partitionId,
-   OptionalLong offset,
-   Optional includedTopics,
-   Consumer consumer,
-   long timeoutMs) {
-this.topic = topic;
-this.partitionId = partitionId;
-this.offset = offset;
-this.includedTopics = includedTopics;
+public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) {
+Optional topic = Optional.ofNullable(opts.topicArg());

Review Comment:
   Could `topicArg()` directly return `Optional`?
   Same below for `includedTopicsArg()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


nizhikov commented on code in PR #15363:
URL: https://github.com/apache/kafka/pull/15363#discussion_r1514833400


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.Function0;
+import scala.Function1;
+import scala.Option;
+import scala.collection.Seq;
+import scala.runtime.BoxedUnit;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.test.TestUtils.RANDOM;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
+private static final String[][] DESCRIBE_TYPE_OFFSETS = new String[][]{new 
String[]{""}, new String[]{"--offsets"}};
+private static final String[][] DESCRIBE_TYPE_MEMBERS = new String[][]{new 
String[]{"--members"}, new String[]{"--members", "--verbose"}};
+private static final String[][] DESCRIBE_TYPE_STATE = new String[][]{new 
String[]{"--state"}};
+private static final String[][] DESCRIBE_TYPES;
+
+static {
+List describeTypes = new ArrayList<>();
+
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_OFFSETS));
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_MEMBERS));
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_STATE));
+
+DESCRIBE_TYPES = describeTypes.toArray(new String[0][0]);
+}
+
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
+public void testDescribeNonExistingGroup(String quorum, String 
groupProtocol) {
+createOffsetsTopic(listenerName(), new Properties());
+String missingGroup = "missing.group";
+
+for (String[] describeType : DESCRIBE_TYPES) {
+// note the group to be queried is a different (non-existing) group
+List cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", missingGroup));
+cgcArgs.addAll(Arrays.asList(describeType));
+ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs.toArray(new String[0]));
+
+String output = 
kafka.utils.TestUtils.grabConsoleOutput(describeGroups(service));
+assertTrue(output.contains("Consumer group '" + missingGroup + "' 
does not exist."),
+"Expected error was not detected for describe option '" + 
String.join(" ", describeType) + "'");
+}
+}
+
+@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+@ValueSource(strings = {"zk", "kraft"})
+public void testDescribeWithMultipleSubActions(String quorum) {
+AtomicInteger exitStatus = new AtomicInteger(0);
+AtomicReference exitMessage = new 

Re: [PR] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


nizhikov commented on code in PR #15363:
URL: https://github.com/apache/kafka/pull/15363#discussion_r1514832937


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.Function0;
+import scala.Function1;
+import scala.Option;
+import scala.collection.Seq;
+import scala.runtime.BoxedUnit;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.test.TestUtils.RANDOM;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
+private static final String[][] DESCRIBE_TYPE_OFFSETS = new String[][]{new 
String[]{""}, new String[]{"--offsets"}};

Review Comment:
   Yes. Refactored



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.Function0;
+import scala.Function1;
+import scala.Option;
+import scala.collection.Seq;
+import scala.runtime.BoxedUnit;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 

Re: [PR] KAFKA-16318 [WIP]: add javafoc for kafka metric [kafka]

2024-03-06 Thread via GitHub


chia7712 commented on code in PR #15483:
URL: https://github.com/apache/kafka/pull/15483#discussion_r1514824043


##
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java:
##
@@ -20,6 +20,37 @@
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.utils.Time;
 
+/**
+ * An implementation of {@link Metric} interface.
+ * 
+ * A KafkaMetric is a named metric for monitoring purpose. The metric value 
can be a {@link Measurable} or a {@link Gauge}.
+ * 
+ * metricName The name of the metric
+ * lock A lock used for reading the metric value in case of race 
condition
+ * time The POSIX time in milliseconds the metric is being taken
+ * metricValueProvider The metric collecting implementation that 
implements {@link MetricValueProvider}
+ * config The metric configuration which is a {@link MetricConfig}
+ * 
+ * 
+ * Usage looks something like this:
+ *
+ * {@code
+ * // set up metrics:
+ *
+ * Map tags = new HashMap<>();
+ * tags.put("key1", "value1");
+ *
+ * MetricConfig config = new MetricConfig().tags(metricTags);
+ * Time time = new SystemTime();
+ * metricName = new MetricName(message-size-max, 
producer-metrics);
+ *
+ * KafkaMetric m = new KafkaMetric(new Object(),

Review Comment:
   It seems to me the point of usage is how to use its public methods rather 
than how to build this object. The reason of making `KafkaMetrics` be a public 
Api is that `MetricsReporter` 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java#L39),
 which is a public interface, has many methods taking `KafkaMetric` as argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]

2024-03-06 Thread via GitHub


iit2009060 commented on PR #15444:
URL: https://github.com/apache/kafka/pull/15444#issuecomment-1981236742

   Yes @showuon definitely. 
   Can you help me with the reference or the process to start working on it. Do 
I need to create a different JIRA  for it ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16318 [WIP]: add javafoc for kafka metric [kafka]

2024-03-06 Thread via GitHub


johnnychhsu opened a new pull request, #15483:
URL: https://github.com/apache/kafka/pull/15483

   ## Context
   There was no javadoc for `KafkaMetric`
   
   ## Solution
   Add the javadoc for this class
   
   ## Test
   Add screenshots 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


chia7712 commented on code in PR #15363:
URL: https://github.com/apache/kafka/pull/15363#discussion_r1514645806


##
tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java:
##
@@ -210,6 +212,29 @@ public static File tempPropertiesFile(Map 
properties) throws IOE
 return org.apache.kafka.test.TestUtils.tempFile(sb.toString());
 }
 
+/**
+ * Invoke `compute` until `predicate` is true or `waitTime` elapses.
+ *
+ * Return the last `compute` result and a boolean indicating whether 
`predicate` succeeded for that value.
+ *
+ * This method is useful in cases where `waitUntilTrue` makes it awkward 
to provide good error messages.
+ */
+public static  Tuple2 computeUntilTrue(Supplier compute, 
long waitTime, long pause, Predicate predicate) {

Review Comment:
   this change can be reverted since it is unused now.



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.Function0;
+import scala.Function1;
+import scala.Option;
+import scala.collection.Seq;
+import scala.runtime.BoxedUnit;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.test.TestUtils.RANDOM;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
+private static final String[][] DESCRIBE_TYPE_OFFSETS = new String[][]{new 
String[]{""}, new String[]{"--offsets"}};
+private static final String[][] DESCRIBE_TYPE_MEMBERS = new String[][]{new 
String[]{"--members"}, new String[]{"--members", "--verbose"}};
+private static final String[][] DESCRIBE_TYPE_STATE = new String[][]{new 
String[]{"--state"}};
+private static final String[][] DESCRIBE_TYPES;
+
+static {
+List describeTypes = new ArrayList<>();
+
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_OFFSETS));
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_MEMBERS));
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_STATE));
+
+DESCRIBE_TYPES = describeTypes.toArray(new String[0][0]);
+}
+
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
+public void testDescribeNonExistingGroup(String quorum, String 
groupProtocol) {
+createOffsetsTopic(listenerName(), new Properties());
+String missingGroup = "missing.group";
+
+for (String[] describeType : DESCRIBE_TYPES) {
+// note the group to be queried is a different (non-existing) group
+List cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", missingGroup));
+

[jira] [Assigned] (KAFKA-16335) Remove Deprecated method on StreamPartitioner

2024-03-06 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caio César reassigned KAFKA-16335:
--

Assignee: Caio César

> Remove Deprecated method on StreamPartitioner
> -
>
> Key: KAFKA-16335
> URL: https://issues.apache.org/jira/browse/KAFKA-16335
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Caio César
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Deprecated in 3.4 release via 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356]
>  * StreamPartitioner#partition (singular)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15949) Improve the KRaft metadata version related messages

2024-03-06 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang reassigned KAFKA-15949:
-

Assignee: PoAn Yang

> Improve the KRaft metadata version related messages
> ---
>
> Key: KAFKA-15949
> URL: https://issues.apache.org/jira/browse/KAFKA-15949
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.0
>Reporter: Jakub Scholz
>Assignee: PoAn Yang
>Priority: Major
>
> Various error messages related to KRaft seem to use very different style and 
> formatting. Just for example in the {{StorageTool}} Scala class, there are 
> two different examples:
>  * {{Must specify a valid KRaft metadata version of at least 3.0.}}
>  ** Refers to "metadata version"
>  ** Refers to the version as 3.0 (although strictly speaking 3.0-IV0 is not 
> valid for KRaft)
>  * {{SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.}}
>  ** Talks about "metadataVersion"
>  ** Refers to "IBP_3_5_IV2" instead of "3.5" or "3.5-IV2"
> Other pieces of Kafka code seem to also talk about "metadata.version" for 
> example.
> For users, it would be nice if the style and formats used were the same 
> everywhere. Would it be worth unifying messages like this? If yes, what would 
> be the preferred style to use?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16335: Remove deprecated method of StreamPartitioner [kafka]

2024-03-06 Thread via GitHub


caioguedes opened a new pull request, #15482:
URL: https://github.com/apache/kafka/pull/15482

   Remove deprecated method StreamPartitioner#partition.
   
   All tests were updated if the new signature of 
`StreamPartitioner#partitions`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]

2024-03-06 Thread via GitHub


divijvaidya commented on code in PR #15472:
URL: https://github.com/apache/kafka/pull/15472#discussion_r1514414426


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -202,6 +203,9 @@ public List read() {
 
 private final UnifiedLog mockLog = mock(UnifiedLog.class);
 
+Integer maxEntries = 30;

Review Comment:
   `private final static` please
   
   Also, for constants we usually capital snake syntax such as MAX_ENTRIES
   
   (same for base offset)



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -722,6 +727,13 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
 File logFile = segment.log().file();
 String logFileName = logFile.getName();
 
+// Corrupted indexes should not be uploaded to remote storage
+// Example case: Local storage was filled, what caused index 
corruption
+// We should avoid uploading such segments
+segment.timeIndex().sanityCheck();
+segment.offsetIndex().sanityCheck();
+segment.txnIndex().sanityCheck();
+
 logger.info("Copying {} to remote storage.", logFileName);

Review Comment:
   This should probably be moved before the sanity checks so that during 
debugging it is easy to understand what segment was being uploaded when index 
failed.



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
 }
 }
 
+@Test
+void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() 
throws Exception {
+long segmentStartOffset = 0L;
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create log segment, with 0 as log start offset
+LogSegment segment = mock(LogSegment.class);
+
+// Segment does not nage timeIndex() what is not acceptable to 
sanityChecks.
+// In that case segment won't be copied.
+when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(segment);
+when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+when(mockLog.lastStableOffset()).thenReturn(150L);
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.copyLogSegmentsToRemote(mockLog);
+
+// verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
+// Since segment with index corruption should not be uploaded
+verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
+verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+}
+
+
+@Test
+void testCorruptedTimeIndexes() throws Exception {
+// copyLogSegment is executed in case we have more than 1 segment, 
what is why we create 2 of them
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+File tempFile = TestUtils.tempFile();
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+File tempDir = TestUtils.tempDirectory();
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+// Mock all required segment data to be managed by RLMTask
+FileRecords fileRecords = mock(FileRecords.class);
+

Re: [PR] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


nizhikov commented on code in PR #15363:
URL: https://github.com/apache/kafka/pull/15363#discussion_r1514391934


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -0,0 +1,855 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tools.ToolsTestUtils;
+import org.apache.kafka.tools.Tuple2;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.Function0;
+import scala.Function1;
+import scala.Option;
+import scala.collection.Seq;
+import scala.runtime.BoxedUnit;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.RANDOM;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
+private static final String[][] DESCRIBE_TYPE_OFFSETS = new String[][]{new 
String[]{""}, new String[]{"--offsets"}};
+private static final String[][] DESCRIBE_TYPE_MEMBERS = new String[][]{new 
String[]{"--members"}, new String[]{"--members", "--verbose"}};
+private static final String[][] DESCRIBE_TYPE_STATE = new String[][]{new 
String[]{"--state"}};
+private static final String[][] DESCRIBE_TYPES;
+
+static {
+List describeTypes = new ArrayList<>();
+
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_OFFSETS));
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_MEMBERS));
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_STATE));
+
+DESCRIBE_TYPES = describeTypes.toArray(new String[0][0]);
+}
+
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
+public void testDescribeNonExistingGroup(String quorum, String 
groupProtocol) {
+createOffsetsTopic(listenerName(), new Properties());
+String missingGroup = "missing.group";
+
+for (String[] describeType : DESCRIBE_TYPES) {
+// note the group to be queried is a different (non-existing) group
+List cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", missingGroup));
+cgcArgs.addAll(Arrays.asList(describeType));
+ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs.toArray(new String[0]));
+
+String output = 
kafka.utils.TestUtils.grabConsoleOutput(describeGroups(service));

Review Comment:
   Not for now.
   
   Scala caches `System.out` value inside `Console#outVar` and when we change 
`System.out` value from java code we can't capture scala code output.
   
   But, in big PR (which moves command itself) I use 
`ToolsTestUtils#captureStandartOut` instead of scala version.



-- 
This is an automated message from the Apache Git 

Re: [PR] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


nizhikov commented on code in PR #15363:
URL: https://github.com/apache/kafka/pull/15363#discussion_r1514388497


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -0,0 +1,855 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tools.ToolsTestUtils;
+import org.apache.kafka.tools.Tuple2;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.Function0;
+import scala.Function1;
+import scala.Option;
+import scala.collection.Seq;
+import scala.runtime.BoxedUnit;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.RANDOM;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
+private static final String[][] DESCRIBE_TYPE_OFFSETS = new String[][]{new 
String[]{""}, new String[]{"--offsets"}};
+private static final String[][] DESCRIBE_TYPE_MEMBERS = new String[][]{new 
String[]{"--members"}, new String[]{"--members", "--verbose"}};
+private static final String[][] DESCRIBE_TYPE_STATE = new String[][]{new 
String[]{"--state"}};
+private static final String[][] DESCRIBE_TYPES;
+
+static {
+List describeTypes = new ArrayList<>();
+
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_OFFSETS));
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_MEMBERS));
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_STATE));
+
+DESCRIBE_TYPES = describeTypes.toArray(new String[0][0]);
+}
+
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
+public void testDescribeNonExistingGroup(String quorum, String 
groupProtocol) {
+createOffsetsTopic(listenerName(), new Properties());
+String missingGroup = "missing.group";
+
+for (String[] describeType : DESCRIBE_TYPES) {
+// note the group to be queried is a different (non-existing) group
+List cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", missingGroup));
+cgcArgs.addAll(Arrays.asList(describeType));
+ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs.toArray(new String[0]));
+
+String output = 
kafka.utils.TestUtils.grabConsoleOutput(describeGroups(service));
+assertTrue(output.contains("Consumer group '" + missingGroup + "' 
does not exist."),
+"Expected error was not detected for describe option '" + 
String.join(" ", describeType) + "'");
+}
+}
+
+@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+@ValueSource(strings = {"zk", "kraft"})
+public void 

[jira] [Comment Edited] (KAFKA-16344) Internal topic mm2-offset-syncsinternal created with single partition is putting more load on the broker

2024-03-06 Thread Janardhana Gopalachar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823930#comment-17823930
 ] 

Janardhana Gopalachar edited comment on KAFKA-16344 at 3/6/24 12:19 PM:


HI [~gharris1727] 

while processing 24k events/second, MM2 internal topic gets 10k events/sec. is 
the event load the internal topic is getting. Is there any parameters that can 
be tuned so that CPU load on the broker instance for the internal topic leader 
can be distributed 

Regards

Jana


was (Author: JIRAUSER301625):
HI [~gharris1727] 

while processing 24k events/second, MM2 internal topic gets 10k events/sec. is 
the event load the internal topic is getting

Regards

Jana

> Internal topic mm2-offset-syncsinternal created with single 
> partition is putting more load on the broker
> -
>
> Key: KAFKA-16344
> URL: https://issues.apache.org/jira/browse/KAFKA-16344
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1
>Reporter: Janardhana Gopalachar
>Priority: Major
>
> We are using Kafka 3.5.1 version, we see that the internal topic created by 
> mirrormaker 
> mm2-offset-syncsinternal is created with single partition due to 
> which the CPU load on the broker which will be leader for this partition is 
> increased compared to other brokers. Can multiple partitions be  created for 
> the topic so that the CPU load would get distributed 
>  
> Topic: mm2-offset-syncscluster-ainternal    TopicId: XRvTDbogT8ytNhqX2YTyrA   
>  PartitionCount: 1ReplicationFactor: 3    Configs: 
> min.insync.replicas=2,cleanup.policy=compact,message.format.version=3.0-IV1
>     Topic: mm2-offset-syncscluster-ainternal    Partition: 0    Leader: 2    
> Replicas: 2,1,0    Isr: 2,1,0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


chia7712 commented on code in PR #15363:
URL: https://github.com/apache/kafka/pull/15363#discussion_r1514369281


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -0,0 +1,855 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tools.ToolsTestUtils;
+import org.apache.kafka.tools.Tuple2;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.Function0;
+import scala.Function1;
+import scala.Option;
+import scala.collection.Seq;
+import scala.runtime.BoxedUnit;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.RANDOM;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
+private static final String[][] DESCRIBE_TYPE_OFFSETS = new String[][]{new 
String[]{""}, new String[]{"--offsets"}};
+private static final String[][] DESCRIBE_TYPE_MEMBERS = new String[][]{new 
String[]{"--members"}, new String[]{"--members", "--verbose"}};
+private static final String[][] DESCRIBE_TYPE_STATE = new String[][]{new 
String[]{"--state"}};
+private static final String[][] DESCRIBE_TYPES;
+
+static {
+List describeTypes = new ArrayList<>();
+
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_OFFSETS));
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_MEMBERS));
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_STATE));
+
+DESCRIBE_TYPES = describeTypes.toArray(new String[0][0]);
+}
+
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
+public void testDescribeNonExistingGroup(String quorum, String 
groupProtocol) {
+createOffsetsTopic(listenerName(), new Properties());
+String missingGroup = "missing.group";
+
+for (String[] describeType : DESCRIBE_TYPES) {
+// note the group to be queried is a different (non-existing) group
+List cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", missingGroup));
+cgcArgs.addAll(Arrays.asList(describeType));
+ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs.toArray(new String[0]));
+
+String output = 
kafka.utils.TestUtils.grabConsoleOutput(describeGroups(service));

Review Comment:
   Can `kafka.utils.TestUtils.grabConsoleOutput` be replaced by 
`ToolsTestUtils#captureStandardOut`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about 

Re: [PR] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


chia7712 commented on code in PR #15363:
URL: https://github.com/apache/kafka/pull/15363#discussion_r1514366938


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -0,0 +1,855 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tools.ToolsTestUtils;
+import org.apache.kafka.tools.Tuple2;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.Function0;
+import scala.Function1;
+import scala.Option;
+import scala.collection.Seq;
+import scala.runtime.BoxedUnit;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.RANDOM;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
+private static final String[][] DESCRIBE_TYPE_OFFSETS = new String[][]{new 
String[]{""}, new String[]{"--offsets"}};
+private static final String[][] DESCRIBE_TYPE_MEMBERS = new String[][]{new 
String[]{"--members"}, new String[]{"--members", "--verbose"}};
+private static final String[][] DESCRIBE_TYPE_STATE = new String[][]{new 
String[]{"--state"}};
+private static final String[][] DESCRIBE_TYPES;
+
+static {
+List describeTypes = new ArrayList<>();
+
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_OFFSETS));
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_MEMBERS));
+describeTypes.addAll(Arrays.asList(DESCRIBE_TYPE_STATE));
+
+DESCRIBE_TYPES = describeTypes.toArray(new String[0][0]);
+}
+
+@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
+public void testDescribeNonExistingGroup(String quorum, String 
groupProtocol) {
+createOffsetsTopic(listenerName(), new Properties());
+String missingGroup = "missing.group";
+
+for (String[] describeType : DESCRIBE_TYPES) {
+// note the group to be queried is a different (non-existing) group
+List cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", missingGroup));
+cgcArgs.addAll(Arrays.asList(describeType));
+ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs.toArray(new String[0]));
+
+String output = 
kafka.utils.TestUtils.grabConsoleOutput(describeGroups(service));
+assertTrue(output.contains("Consumer group '" + missingGroup + "' 
does not exist."),
+"Expected error was not detected for describe option '" + 
String.join(" ", describeType) + "'");
+}
+}
+
+@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+@ValueSource(strings = {"zk", "kraft"})
+public void 

[jira] [Updated] (KAFKA-16346) Fix flaky MetricsTest.testMetrics

2024-03-06 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang updated KAFKA-16346:
--
Summary: Fix flaky MetricsTest.testMetrics  (was: Fix flay 
MetricsTest.testMetrics)

> Fix flaky MetricsTest.testMetrics
> -
>
> Key: KAFKA-16346
> URL: https://issues.apache.org/jira/browse/KAFKA-16346
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> {code}
> Gradle Test Run :core:test > Gradle Test Executor 1119 > MetricsTest > 
> testMetrics(boolean) > testMetrics with systemRemoteStorageEnabled: false 
> FAILED
> org.opentest4j.AssertionFailedError: Broker metric not recorded correctly 
> for 
> kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request=Produce
>  value 0.0 ==> expected:  but was: 
> at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at 
> app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
> at 
> app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
> at 
> app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
> at 
> app//kafka.api.MetricsTest.verifyBrokerMessageConversionMetrics(MetricsTest.scala:314)
> at app//kafka.api.MetricsTest.testMetrics(MetricsTest.scala:110)
> {code}
> The value used to update metrics is calculated by Math.round, so it could be 
> zero if you have a good machine :)
> We should verify the `count`  instead of `value`, since it is convincible and 
> more stable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16348) Fix flaky TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress

2024-03-06 Thread Johnny Hsu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnny Hsu reassigned KAFKA-16348:
--

Assignee: Johnny Hsu

> Fix flaky 
> TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
> ---
>
> Key: KAFKA-16348
> URL: https://issues.apache.org/jira/browse/KAFKA-16348
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Johnny Hsu
>Priority: Minor
>
> {code:java}
> Gradle Test Run :tools:test > Gradle Test Executor 36 > 
> TopicCommandIntegrationTest > 
> testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String) > 
> testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).kraft
>  FAILED
>     org.opentest4j.AssertionFailedError: --under-replicated-partitions 
> shouldn't return anything: 'Topic: 
> testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress-4l8dkZ6JT2  
> Partition: 0    Leader: 3       Replicas: 0,3   Isr: 3' ==> expected: <> but 
> was:  testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress-4l8dkZ6JT2  
> Partition: 0    Leader: 3       Replicas: 0,3   Isr: 3>
>         at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>         at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>         at 
> app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
>         at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
>         at 
> app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1156)
>         at 
> app//org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(TopicCommandIntegrationTest.java:827)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16348) Fix flaky TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress

2024-03-06 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16348:
--

 Summary: Fix flaky 
TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
 Key: KAFKA-16348
 URL: https://issues.apache.org/jira/browse/KAFKA-16348
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai


{code:java}
Gradle Test Run :tools:test > Gradle Test Executor 36 > 
TopicCommandIntegrationTest > 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String) > 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).kraft 
FAILED
    org.opentest4j.AssertionFailedError: --under-replicated-partitions 
shouldn't return anything: 'Topic: 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress-4l8dkZ6JT2  
Partition: 0    Leader: 3       Replicas: 0,3   Isr: 3' ==> expected: <> but 
was: 
        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
        at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
        at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1156)
        at 
app//org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(TopicCommandIntegrationTest.java:827)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16322) Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1

2024-03-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16322.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1
> --
>
> Key: KAFKA-16322
> URL: https://issues.apache.org/jira/browse/KAFKA-16322
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Johnny Hsu
>Priority: Major
> Fix For: 3.8.0
>
>
> https://devhub.checkmarx.com/cve-details/CVE-2023-50572/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16322 upgrade jline from 3.22.0 to 3.25.1 [kafka]

2024-03-06 Thread via GitHub


chia7712 merged PR #15464:
URL: https://github.com/apache/kafka/pull/15464


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16322 : upgrade jline [kafka]

2024-03-06 Thread via GitHub


chia7712 commented on PR #15464:
URL: https://github.com/apache/kafka/pull/15464#issuecomment-1980679646

   all failed tests pass on my local
   ```sh
   ./gradlew cleanTest core:test --tests ReplicaManagerTest --tests 
AssignmentsManagerTest connect:runtime:test --tests OffsetsApiIntegrationTest 
tools:test --tests TopicCommandIntegrationTest metadata:test --tests 
QuorumControllerTest connect:mirror:test --tests DedicatedMirrorIntegrationTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]

2024-03-06 Thread via GitHub


showuon commented on PR #15444:
URL: https://github.com/apache/kafka/pull/15444#issuecomment-1980653675

   @iit2009060 , good question. If we want to make `producerSnapshot` as 
optional, we might need a simple KIP to change to RSM API. Are you able to do 
that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-06 Thread via GitHub


hni61223 commented on code in PR #15478:
URL: https://github.com/apache/kafka/pull/15478#discussion_r1514278368


##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -677,4 +675,38 @@ object KafkaMetadataLog extends Logging {
   Snapshots.deleteIfExists(logDir, snapshotId)
 }
   }
+
+  private sealed trait SnapshotDeletionReason {
+def reason(snapshotId: OffsetAndEpoch): String
+  }
+
+  private final case class RetentionMsBreach(now: Long, timestamp: Long, 
retentionMillis: Long) extends SnapshotDeletionReason {
+override def reason(snapshotId: OffsetAndEpoch): String = {
+  s"""Marking snapshot $snapshotId for deletion because it timestamp 
($timestamp) is now ($now) older than the

Review Comment:
   nit: its timestamp



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-06 Thread via GitHub


hni61223 commented on code in PR #15478:
URL: https://github.com/apache/kafka/pull/15478#discussion_r1514278368


##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -677,4 +675,38 @@ object KafkaMetadataLog extends Logging {
   Snapshots.deleteIfExists(logDir, snapshotId)
 }
   }
+
+  private sealed trait SnapshotDeletionReason {
+def reason(snapshotId: OffsetAndEpoch): String
+  }
+
+  private final case class RetentionMsBreach(now: Long, timestamp: Long, 
retentionMillis: Long) extends SnapshotDeletionReason {
+override def reason(snapshotId: OffsetAndEpoch): String = {
+  s"""Marking snapshot $snapshotId for deletion because it timestamp 
($timestamp) is now ($now) older than the

Review Comment:
   nit: it's timestamp



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]

2024-03-06 Thread via GitHub


Nickstery commented on code in PR #15472:
URL: https://github.com/apache/kafka/pull/15472#discussion_r1514277995


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -707,6 +708,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 this.cancel();
 } catch (InterruptedException | RetriableException ex) {
 throw ex;
+} catch (CorruptIndexException ex) {
+logger.error("Error occurred while copying log segments. Index 
appeared to be corrupted for partition: {}  ", topicIdPartition, ex);

Review Comment:
   Agree and fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]

2024-03-06 Thread via GitHub


FrankYang0529 opened a new pull request, #15481:
URL: https://github.com/apache/kafka/pull/15481

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]

2024-03-06 Thread via GitHub


Nickstery commented on code in PR #15472:
URL: https://github.com/apache/kafka/pull/15472#discussion_r1514277995


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -707,6 +708,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 this.cancel();
 } catch (InterruptedException | RetriableException ex) {
 throw ex;
+} catch (CorruptIndexException ex) {
+logger.error("Error occurred while copying log segments. Index 
appeared to be corrupted for partition: {}  ", topicIdPartition, ex);

Review Comment:
   Agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]

2024-03-06 Thread via GitHub


Nickstery commented on code in PR #15472:
URL: https://github.com/apache/kafka/pull/15472#discussion_r1514266315


##
storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java:
##
@@ -75,13 +75,14 @@ public void sanityCheck() {
 TimestampOffset entry = lastEntry();
 long lastTimestamp = entry.timestamp;
 long lastOffset = entry.offset;
-if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
-throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
-+ "non-zero size but the last timestamp is " + lastTimestamp + 
" which is less than the first timestamp "
-+ timestamp(mmap(), 0));
+
 if (entries() != 0 && lastOffset < baseOffset())
 throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
 + "non-zero size but the last offset is " + lastOffset + " 
which is less than the first offset " + baseOffset());
+if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))

Review Comment:
   In the test we are not mocking mmap(), it is a bit trickier to do so, so 
when I write test to check index corruption I do it by satisfying this
   ```
   if (entries() != 0 && lastOffset < baseOffset())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16319: Divide DeleteTopics requests by leader node [kafka]

2024-03-06 Thread via GitHub


AndrewJSchofield commented on PR #15479:
URL: https://github.com/apache/kafka/pull/15479#issuecomment-1980543598

   @showuon Would you be able to review this PR please? It's a fix to an 
earlier PR which you reviewed, so I think you'll have the context to understand 
it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16319: Divide DeleteTopics requests by leader node [kafka]

2024-03-06 Thread via GitHub


AndrewJSchofield commented on code in PR #15479:
URL: https://github.com/apache/kafka/pull/15479#discussion_r1514201905


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java:
##
@@ -79,15 +79,15 @@ public static SimpleAdminApiFuture newFuture(
 @Override
 public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
 Map 
deletionsForTopic = new HashMap<>();
-for (Map.Entry entry: 
recordsToDelete.entrySet()) {
-TopicPartition topicPartition = entry.getKey();
+for (TopicPartition topicPartition : keys) {
+RecordsToDelete toDelete = recordsToDelete.get(topicPartition);
 DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = 
deletionsForTopic.computeIfAbsent(
 topicPartition.topic(),
 key -> new 
DeleteRecordsRequestData.DeleteRecordsTopic().setName(topicPartition.topic())
 );
 deleteRecords.partitions().add(new 
DeleteRecordsRequestData.DeleteRecordsPartition()
 .setPartitionIndex(topicPartition.partition())
-.setOffset(entry.getValue().beforeOffset()));
+.setOffset(toDelete.beforeOffset()));

Review Comment:
   Yes, in my opinion it would be unduly paranoid. I would end up writing 
conditional logic for a situation which will never occur, and it would differ 
from other code in this area of AdminClient which also uses a map which is 
accessed in this way confidently expecting the entry to be present.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16322 : upgrade jline [kafka]

2024-03-06 Thread via GitHub


johnnychhsu commented on PR #15464:
URL: https://github.com/apache/kafka/pull/15464#issuecomment-1980499464

   the test is flaky since the current failed pipeline succeeded in previous run


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


nizhikov commented on PR #15363:
URL: https://github.com/apache/kafka/pull/15363#issuecomment-1980469311

   @chia7712 This PR ready for review.
   
   Please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]

2024-03-06 Thread via GitHub


iit2009060 commented on PR #15444:
URL: https://github.com/apache/kafka/pull/15444#issuecomment-1980469009

   Yes @showuon I will raise a PR for it. Need one clarification 
   As per code when we copy a 
[logSegment](https://github.com/iit2009060/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L744)
 
   ,  producerSnapshot is a  required parameter. 
   Should we change this behaviour similar to transaction Index where 
producerSnapshot can also be  an optional parameter. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-06 Thread via GitHub


wernerdv commented on PR #15457:
URL: https://github.com/apache/kafka/pull/15457#issuecomment-1980452029

   Hello, @mimaison
   Please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


AndrewJSchofield commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1514119801


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+// This is used in the describe topics path if using DescribeTopics API.
+private Node replicaToFakeNode(int id) {
+return new Node(id, "Dummy", 0);
+}

Review Comment:
   Maybe you should create a new constructor for TopicPartitionInfo that 
doesn't require a Node. This faking is a bit ugly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


AndrewJSchofield commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1514117940


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception {
 }
 }
 
+@SuppressWarnings("NPathComplexity")

Review Comment:
   The client sends one. There will be multiple client implementations over 
time, so it's prudent to ensure that a malformed cursor is caught properly, 
just in case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


nizhikov commented on PR #15365:
URL: https://github.com/apache/kafka/pull/15365#issuecomment-1980426424

   @chia7712 Thank you for the review and merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16252) Maligned Metrics formatting

2024-03-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16252.

Resolution: Fixed

> Maligned Metrics formatting
> ---
>
> Key: KAFKA-16252
> URL: https://issues.apache.org/jira/browse/KAFKA-16252
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 3.6.1
>Reporter: James
>Assignee: Cheng-Kai, Zhang
>Priority: Minor
> Fix For: 3.8.0
>
> Attachments: image-2024-02-13-22-34-31-371.png
>
>
> There's some inconsistencies, and I believe indeed some buggy content in the 
> documentation for monitoring kafka.
> 1. Some MBean documentation is presented as a TR with a colspan of 3 instead 
> of its normal location of the third column
> 2. There seems to be some erroneous data posted in the headings for a handful 
> of documentation sections, ex
> {code:java}
>  [2023-09-15 00:40:42,725] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:693) [2023-09-15 00:40:42,729] INFO 
> Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:703)  {code}
> Links to erroneous content (not permalinks)
>  
>  * [https://kafka.apache.org/documentation/#producer_sender_monitoring]
>  * [https://kafka.apache.org/documentation/#consumer_fetch_monitoring]
>  * [https://kafka.apache.org/documentation/#connect_monitoring]
> This image demonstrates both issues
> !image-2024-02-13-22-34-31-371.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16252) Maligned Metrics formatting

2024-03-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16252:
---
Fix Version/s: 3.8.0

> Maligned Metrics formatting
> ---
>
> Key: KAFKA-16252
> URL: https://issues.apache.org/jira/browse/KAFKA-16252
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 3.6.1
>Reporter: James
>Assignee: Cheng-Kai, Zhang
>Priority: Minor
> Fix For: 3.8.0
>
> Attachments: image-2024-02-13-22-34-31-371.png
>
>
> There's some inconsistencies, and I believe indeed some buggy content in the 
> documentation for monitoring kafka.
> 1. Some MBean documentation is presented as a TR with a colspan of 3 instead 
> of its normal location of the third column
> 2. There seems to be some erroneous data posted in the headings for a handful 
> of documentation sections, ex
> {code:java}
>  [2023-09-15 00:40:42,725] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:693) [2023-09-15 00:40:42,729] INFO 
> Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:703)  {code}
> Links to erroneous content (not permalinks)
>  
>  * [https://kafka.apache.org/documentation/#producer_sender_monitoring]
>  * [https://kafka.apache.org/documentation/#consumer_fetch_monitoring]
>  * [https://kafka.apache.org/documentation/#connect_monitoring]
> This image demonstrates both issues
> !image-2024-02-13-22-34-31-371.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16181: Use incrementalAlterConfigs when updating broker configs by kafka-configs.sh [kafka]

2024-03-06 Thread via GitHub


dengziming commented on PR #15304:
URL: https://github.com/apache/kafka/pull/15304#issuecomment-1980422478

   @dajac Hello, this PR is ready for review except the upgrade docs, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-06 Thread via GitHub


chia7712 merged PR #15473:
URL: https://github.com/apache/kafka/pull/15473


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


chia7712 commented on PR #15365:
URL: https://github.com/apache/kafka/pull/15365#issuecomment-1980412147

   @nizhikov thanks for this refactor. let us deal with next PR :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


chia7712 merged PR #15365:
URL: https://github.com/apache/kafka/pull/15365


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-06 Thread via GitHub


chia7712 commented on PR #15365:
URL: https://github.com/apache/kafka/pull/15365#issuecomment-1980410184

   the failed tests pass on my machine.
   ```sh
   ./gradlew cleanTest core:test --tests LogDirFailureTest --tests 
ReplicaManagerTest --tests TransactionsTest --tests SaslPlaintextConsumerTest 
tools:test --tests MetadataQuorumCommandTest
   ```
   
   will merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16344) Internal topic mm2-offset-syncsinternal created with single partition is putting more load on the broker

2024-03-06 Thread Janardhana Gopalachar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823930#comment-17823930
 ] 

Janardhana Gopalachar commented on KAFKA-16344:
---

HI [~gharris1727] 

while processing 24k events/second, MM2 internal topic gets 10k events/sec. is 
the event load the internal topic is getting

Regards

Jana

> Internal topic mm2-offset-syncsinternal created with single 
> partition is putting more load on the broker
> -
>
> Key: KAFKA-16344
> URL: https://issues.apache.org/jira/browse/KAFKA-16344
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1
>Reporter: Janardhana Gopalachar
>Priority: Major
>
> We are using Kafka 3.5.1 version, we see that the internal topic created by 
> mirrormaker 
> mm2-offset-syncsinternal is created with single partition due to 
> which the CPU load on the broker which will be leader for this partition is 
> increased compared to other brokers. Can multiple partitions be  created for 
> the topic so that the CPU load would get distributed 
>  
> Topic: mm2-offset-syncscluster-ainternal    TopicId: XRvTDbogT8ytNhqX2YTyrA   
>  PartitionCount: 1ReplicationFactor: 3    Configs: 
> min.insync.replicas=2,cleanup.policy=compact,message.format.version=3.0-IV1
>     Topic: mm2-offset-syncscluster-ainternal    Partition: 0    Leader: 2    
> Replicas: 2,1,0    Isr: 2,1,0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


dajac commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1514092183


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2399,11 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+// This is used in the describe topics path if using 
DescribeTopicPartitions API.
+private Node replicaToFakeNode(int id) {
+return new Node(id, "Dummy", 0);

Review Comment:
   Don't we break the contract of the admin api if we do this? The expectation 
is that the real nodes are returned. We could perhaps get them from the 
metadata?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16347) Bump ZooKeeper to 3.8.4

2024-03-06 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823918#comment-17823918
 ] 

Chia-Ping Tsai commented on KAFKA-16347:


trunk
https://github.com/apache/kafka/commit/ae047bbe56e8dc37ac18472ee631a14e1b35be82

3.7
https://github.com/apache/kafka/commit/174746ca5715b2698d4481b82b328d7f1e4ad5ef

3.6
https://github.com/apache/kafka/commit/6770b477daf2591b2ea37df546d80a5021304897

> Bump ZooKeeper to 3.8.4
> ---
>
> Key: KAFKA-16347
> URL: https://issues.apache.org/jira/browse/KAFKA-16347
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Mickael Maison
>Assignee: Cheng-Kai, Zhang
>Priority: Major
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> ZooKeeper 3.8.4 was released and contains a few CVE fixes: 
> https://zookeeper.apache.org/doc/r3.8.4/releasenotes.html
> We should update 3.6, 3.7 and trunk to use this new ZooKeeper release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16347) Bump ZooKeeper to 3.8.4

2024-03-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16347.

Resolution: Fixed

[~kevinztw] thanks for this contribution!

> Bump ZooKeeper to 3.8.4
> ---
>
> Key: KAFKA-16347
> URL: https://issues.apache.org/jira/browse/KAFKA-16347
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Mickael Maison
>Assignee: Cheng-Kai, Zhang
>Priority: Major
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> ZooKeeper 3.8.4 was released and contains a few CVE fixes: 
> https://zookeeper.apache.org/doc/r3.8.4/releasenotes.html
> We should update 3.6, 3.7 and trunk to use this new ZooKeeper release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16347) Bump ZooKeeper to 3.8.4

2024-03-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16347:
---
Fix Version/s: 3.6.2
   3.7.1

> Bump ZooKeeper to 3.8.4
> ---
>
> Key: KAFKA-16347
> URL: https://issues.apache.org/jira/browse/KAFKA-16347
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Mickael Maison
>Assignee: Cheng-Kai, Zhang
>Priority: Major
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> ZooKeeper 3.8.4 was released and contains a few CVE fixes: 
> https://zookeeper.apache.org/doc/r3.8.4/releasenotes.html
> We should update 3.6, 3.7 and trunk to use this new ZooKeeper release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >