Re: [PR] KAFKA-15879: Add documentation and examples for docker image [kafka]

2023-12-06 Thread via GitHub


omkreddy commented on code in PR #14938:
URL: https://github.com/apache/kafka/pull/14938#discussion_r1418474509


##
docker/examples/jvm/cluster/docker-compose.yml:
##
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+version: '1'

Review Comment:
   It will be great if we can add a multi-node example with multiple listeners 
like SSL, SASL_SSL (plain mechanism) with authorizer configured. Can be added 
in another PR



##
docker/examples/jvm/cluster/docker-compose.yml:
##
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+version: '1'

Review Comment:
   can we also add compose file for multi node combined mode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-12-06 Thread via GitHub


mdedetrich commented on PR #12728:
URL: https://github.com/apache/kafka/pull/12728#issuecomment-1844822620

   I have just pushed a merge commit which adds the 
`testCreateConnectorWithStoppedInitialState()` test. The translation of the 
test was quite straight forward, only thing to note is that the original test 
used `PowerMock.verifyAll();` as an escape hatch, I replaced it with 
`verify(loaderSwap).close();`, not sure if you want to add any more 
verifications (or even remove `verify(loaderSwap).close();` as being 
misleading).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15971: Re-enable consumer integration tests for new consumer [kafka]

2023-12-06 Thread via GitHub


dajac commented on PR #14925:
URL: https://github.com/apache/kafka/pull/14925#issuecomment-1844810244

   Triggered a new build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-06 Thread via GitHub


dajac commented on PR #14774:
URL: https://github.com/apache/kafka/pull/14774#issuecomment-1844806301

   @junrao @jolshan I work on implementing the transactional offset commits in 
the new world. I will take care of adding the verification steps there when 
this PR is done. We can replicate the same pattern there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2023-12-06 Thread via GitHub


tledkov commented on code in PR #14856:
URL: https://github.com/apache/kafka/pull/14856#discussion_r1418448844


##
tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java:
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumergroup;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
+public static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
+
+public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) 
to connect to.";
+public static final String GROUP_DOC = "The consumer group we wish to act 
on.";
+public static final String TOPIC_DOC = "The topic whose consumer group 
information should be deleted or topic whose should be included in the reset 
offset process. " +
+"In `reset-offsets` case, partitions can be specified using this 
format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the 
process. " +
+"Reset-offsets also supports multiple topic inputs.";
+public static final String ALL_TOPICS_DOC = "Consider all topics assigned 
to a group in the `reset-offsets` process.";
+public static final String LIST_DOC = "List all consumer groups.";
+public static final String DESCRIBE_DOC = "Describe consumer group and 
list offset lag (number of messages not yet processed) related to given group.";
+public static final String ALL_GROUPS_DOC = "Apply to all consumer 
groups.";
+public static final String NL = System.getProperty("line.separator");
+public static final String DELETE_DOC = "Pass in groups to delete topic 
partition offsets and ownership information " +
+"over the entire consumer group. For instance --group g1 --group g2";
+public static final String TIMEOUT_MS_DOC = "The timeout that can be set 
for some use cases. For example, it can be used when describing the group " +
+"to specify the maximum amount of time in milliseconds to wait before 
the group stabilizes (when the group is just created, " +
+"or is going through some changes).";
+public static final String COMMAND_CONFIG_DOC = "Property file containing 
configs to be passed to Admin Client and Consumer.";
+public static final String RESET_OFFSETS_DOC = "Reset offsets of consumer 
group. Supports one consumer group at the time, and instances should be 
inactive" + NL +
+"Has 2 execution options: --dry-run (the default) to plan which 
offsets to reset, and --execute to update the offsets. " +
+"Additionally, the --export option is used to export the results to a 
CSV format." + NL +
+"You must choose one of the following reset specifications: 
--to-datetime, --by-duration, --to-earliest, " +
+"--to-latest, --shift-by, --from-file, --to-current, --to-offset." + 
NL +
+"To define the scope use --all-topics or --topic. One scope must be 
specified unless you use '--from-file'.";
+public static final String DRY_RUN_DOC = "Only show results without 
executing changes on Consumer Groups. Supported operations: reset-offsets.";
+public static final String EXECUTE_DOC = "Execute operation. Supported 
operations: reset-offsets.";
+public static final String EXPORT_DOC = "Export operation execution to a 
CSV file. Supported operations: reset-offsets.";
+public static final String RESET_TO_OFFSET_DOC = "Reset offsets to a 
specific offset.";
+public static final String RESET_FROM_FILE_DOC = "Reset offsets to values 
defined in CSV file.";
+public static final String RESET_TO_DATETIME_DOC = "Reset offsets to 
offset from datetime. Format: '-MM-DDTHH:mm:SS.sss'";
+public static final String RESET_BY_DURATION_DOC = "Reset offsets to 
offset by duration from current timestamp. Format: 

Re: [PR] KAFKA-15903: Add Github Actions Workflow for RC Docker Image [kafka]

2023-12-06 Thread via GitHub


omkreddy merged PR #14940:
URL: https://github.com/apache/kafka/pull/14940


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15880: Add Github Actions Workflow for promoting docker image [kafka]

2023-12-06 Thread via GitHub


omkreddy merged PR #14941:
URL: https://github.com/apache/kafka/pull/14941


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2023-12-06 Thread via GitHub


tledkov commented on code in PR #14856:
URL: https://github.com/apache/kafka/pull/14856#discussion_r1418426538


##
tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java:
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumergroup;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
+public static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
+
+public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) 
to connect to.";
+public static final String GROUP_DOC = "The consumer group we wish to act 
on.";
+public static final String TOPIC_DOC = "The topic whose consumer group 
information should be deleted or topic whose should be included in the reset 
offset process. " +
+"In `reset-offsets` case, partitions can be specified using this 
format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the 
process. " +
+"Reset-offsets also supports multiple topic inputs.";
+public static final String ALL_TOPICS_DOC = "Consider all topics assigned 
to a group in the `reset-offsets` process.";
+public static final String LIST_DOC = "List all consumer groups.";
+public static final String DESCRIBE_DOC = "Describe consumer group and 
list offset lag (number of messages not yet processed) related to given group.";
+public static final String ALL_GROUPS_DOC = "Apply to all consumer 
groups.";
+public static final String NL = System.getProperty("line.separator");

Review Comment:
   ```suggestion
   public static final String NL = System.lineSeparator();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15980: add KIP-1001 CurrentControllerId metric [kafka]

2023-12-06 Thread via GitHub


cmccabe merged PR #14749:
URL: https://github.com/apache/kafka/pull/14749


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]

2023-12-06 Thread via GitHub


cmccabe commented on PR #14899:
URL: https://github.com/apache/kafka/pull/14899#issuecomment-1844406974

   Jenkins had an internal issue. Re-pushing to get a clean build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: fix unit test [kafka]

2023-12-06 Thread via GitHub


mjsax merged PR #14947:
URL: https://github.com/apache/kafka/pull/14947


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException [kafka]

2023-12-06 Thread via GitHub


github-actions[bot] commented on PR #13726:
URL: https://github.com/apache/kafka/pull/13726#issuecomment-1844200113

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15178: Improve ConsumerCoordinator.poll perf [kafka]

2023-12-06 Thread via GitHub


github-actions[bot] commented on PR #13993:
URL: https://github.com/apache/kafka/pull/13993#issuecomment-1844199926

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


mjsax opened a new pull request, #14951:
URL: https://github.com/apache/kafka/pull/14951

   Stacked on other PRs. Still not completed. But early feedback is welcome.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAKFA-15629: Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-12-06 Thread via GitHub


mjsax merged PR #14898:
URL: https://github.com/apache/kafka/pull/14898


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAKFA-15629: Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-12-06 Thread via GitHub


mjsax commented on code in PR #14898:
URL: https://github.com/apache/kafka/pull/14898#discussion_r1418202440


##
docs/streams/upgrade-guide.html:
##
@@ -139,6 +139,10 @@ Streams API
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2;>KIP-985
 extends this functionality by adding .withDescendingKeys() to 
allow user to receive data in descending order.
 
 
+
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery;>KIP-992
 add two new query types, namely TimestampedKeyQuery and 
TimestampedRangeQuery. Both should be used to query a timestamped 
key-value store, to retrieve a ValueAndTimestamp result. The 
existing KeyQuery and RangeQuery are changes to 
always return the value only, also for timestamped key-value stores.

Review Comment:
   ```suggestion
   https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery;>KIP-992
 adds two new query types,
   namely TimestampedKeyQuery and 
TimestampedRangeQuery. Both should be used to query a timestamped 
key-value store, to retrieve a ValueAndTimestamp result.
   The existing KeyQuery and RangeQuery are 
changed to always return the value only for timestamped key-value stores.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


kirktrue commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1418183946


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) {
 }
 
 void cleanup() {
+log.trace("Closing the consumer network thread");
+Timer timer = time.timer(closeTimeout);
 try {
-log.trace("Closing the consumer network thread");
-Timer timer = time.timer(closeTimeout);
-maybeAutocommitOnClose(timer);
 runAtClose(requestManagers.entries(), networkClientDelegate, 
timer);
-maybeLeaveGroup(timer);
 } catch (Exception e) {
 log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
 } finally {
+networkClientDelegate.awaitPendingRequests(timer);

Review Comment:
   Network requests are tied to the `CompletableApplicationEvent`s, right? Can 
we just rely on the events to wait for their network I/O to complete via the 
`addAndGet()` method.?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+public class ConsumerCloseApplicationEvent extends 
CompletableApplicationEvent {

Review Comment:
   Why not just have separate event types as per the rest of the codebase?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+public class ConsumerCloseApplicationEvent extends 
CompletableApplicationEvent {

Review Comment:
   I'm happy to have a superclass for 'close' events, but having a type and a 
task gets a bit muddy, doesn't it?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -45,15 +46,19 @@ public class ApplicationEventProcessor extends 
EventProcessor
 private final Logger log;
 private final ConsumerMetadata metadata;
 private final RequestManagers requestManagers;
+private final NetworkClientDelegate networkClientDelegate;

Review Comment:
   I'm uncomfortable with introducing the `NetworkClientDelegate` at this 
layer. It's centralized in `ConsumerNetworkThread` for the reason that we can 
reason on where the various network I/O is performed.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -113,6 +118,10 @@ public void process(ApplicationEvent event) {
 process((UnsubscribeApplicationEvent) event);
 return;
 
+case PREP_CLOSING:
+processPrepClosingEvent((ConsumerCloseApplicationEvent) event);
+return;
+

Review Comment:
   Any reason we can't have these as separate types like the other events?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -223,23 +232,54 @@ private void process(final TopicMetadataApplicationEvent 
event) {
 event.chain(future);
 }
 

[jira] [Commented] (KAFKA-12670) KRaft support for unclean.leader.election.enable

2023-12-06 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793974#comment-17793974
 ] 

Luke Chen commented on KAFKA-12670:
---

[~cmccabe], do you have any idea when KRaft will support unclean leader 
election? 

> KRaft support for unclean.leader.election.enable
> 
>
> Key: KAFKA-12670
> URL: https://issues.apache.org/jira/browse/KAFKA-12670
> Project: Kafka
>  Issue Type: Task
>Reporter: Colin McCabe
>Assignee: Ryan Dielhenn
>Priority: Major
>
> Implement KRaft support for the unclean.leader.election.enable 
> configurations.  These configurations can be set at the topic, broker, or 
> cluster level.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: remove the use of ConsumerTestBuilder from MembershipManagerImplTest [kafka]

2023-12-06 Thread via GitHub


kirktrue commented on PR #14950:
URL: https://github.com/apache/kafka/pull/14950#issuecomment-1843973958

   @lucasbru—please add the `ctr` tag. If you can also review the code, even 
better 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: remove the use of ConsumerTestBuilder from MembershipManagerImplTest [kafka]

2023-12-06 Thread via GitHub


kirktrue opened a new pull request, #14950:
URL: https://github.com/apache/kafka/pull/14950

   This is one in a series of changes to reduce/remove reliance on the 
`ConsumerTestBuilder` for CTR-related tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15984) Client disconnections can cause hanging transactions on __consumer_offsets

2023-12-06 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-15984:
--

 Summary: Client disconnections can cause hanging transactions on 
__consumer_offsets
 Key: KAFKA-15984
 URL: https://issues.apache.org/jira/browse/KAFKA-15984
 Project: Kafka
  Issue Type: Task
Reporter: Justine Olshan


When investigating frequent hanging transactions on __consumer_offsets 
partitions, we realized that many of them were cause by the same offset being 
committed with duplicates and one with `"isDisconnectedClient":true`. 

TxnOffsetCommits do not have sequence numbers and thus are not protected 
against duplicates in the same way idempotent produce requests are. Thus, when 
a client is disconnected (and flushes its requests), we may see the duplicate 
get appended to the log. 

KIP-890 part 1 should protect against this as the duplicate will not succeed 
verification. KIP-890 part 2 strengthens this further as duplicates (from 
previous transactions) can not be added to new transactions if the partitions 
is re-added since the epoch will be bumped. 

Another possible solution is to do duplicate checking on the group coordinator 
side when the request comes in. This solution could be used instead of KIP-890 
part 1 to prevent hanging transactions but given that part 1 only has one open 
PR remaining, we may not need to do this. However, this can also prevent 
duplicates from being added to a new transaction – something only part 2 will 
protect against.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect

2023-12-06 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793972#comment-17793972
 ] 

Phuc Hong Tran edited comment on KAFKA-15556 at 12/7/23 12:51 AM:
--

There is a duplicated method "nodeUnavailable" which have the same 
functionality as "isUnavailable" in the NetworkClientDelegate class


was (Author: JIRAUSER301295):
There is a duplicated method "nodeUnavailable" which have the same functionalit 
as "isUnavailable" in the NetworkClientDelegate class

> Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, 
> and tryConnect
> -
>
> Key: KAFKA-15556
> URL: https://issues.apache.org/jira/browse/KAFKA-15556
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to 
> handle networking details in a more centralized way. However, in order to 
> reuse code between the existing {{KafkaConsumer}} and the new 
> {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the 
> {{NetworkClientDelegate}} capitulated and -stole- copied three methods from 
> {{ConsumerNetworkClient}} related to detecting node status:
>  # {{isUnavailable}}
>  # {{maybeThrowAuthFailure}}
>  # {{tryConnect}}
> Unfortunately, these have found their way into the {{FetchRequestManager}} 
> and {{OffsetsRequestManager}} implementations. We should review if we can 
> clean up—or even remove—this leaky abstraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect

2023-12-06 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793972#comment-17793972
 ] 

Phuc Hong Tran commented on KAFKA-15556:


There is a duplicated method "nodeUnavailable" which have the same functionalit 
as "isUnavailable" in the NetworkClientDelegate class

> Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, 
> and tryConnect
> -
>
> Key: KAFKA-15556
> URL: https://issues.apache.org/jira/browse/KAFKA-15556
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to 
> handle networking details in a more centralized way. However, in order to 
> reuse code between the existing {{KafkaConsumer}} and the new 
> {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the 
> {{NetworkClientDelegate}} capitulated and -stole- copied three methods from 
> {{ConsumerNetworkClient}} related to detecting node status:
>  # {{isUnavailable}}
>  # {{maybeThrowAuthFailure}}
>  # {{tryConnect}}
> Unfortunately, these have found their way into the {{FetchRequestManager}} 
> and {{OffsetsRequestManager}} implementations. We should review if we can 
> clean up—or even remove—this leaky abstraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15983) Kafka-acls should return authorization already done if repeating work is issued

2023-12-06 Thread Chen He (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen He updated KAFKA-15983:

Summary: Kafka-acls should return authorization already done if repeating 
work is issued  (was: Kafka-acls should return authorization already done if 
repeat work is issued)

> Kafka-acls should return authorization already done if repeating work is 
> issued
> ---
>
> Key: KAFKA-15983
> URL: https://issues.apache.org/jira/browse/KAFKA-15983
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 3.6.0
>Reporter: Chen He
>Priority: Minor
>  Labels: newbie
>
> kafka-acls.sh cmd will always issue normal operation for a cmd if customer 
> already authorized a user. It should reports something like "user {} already 
> authorized with {} resources" instead of do it again and again. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15983) Kafka-acls should return authorization already done if repeat work is issued

2023-12-06 Thread Chen He (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen He updated KAFKA-15983:

Labels: newbie  (was: )

> Kafka-acls should return authorization already done if repeat work is issued
> 
>
> Key: KAFKA-15983
> URL: https://issues.apache.org/jira/browse/KAFKA-15983
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 3.6.0
>Reporter: Chen He
>Priority: Minor
>  Labels: newbie
>
> kafka-acls.sh cmd will always issue normal operation for a cmd if customer 
> already authorized a user. It should reports something like "user {} already 
> authorized with {} resources" instead of do it again and again. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15983) Kafka-acls should return authorization already done if repeat work is issued

2023-12-06 Thread Chen He (Jira)
Chen He created KAFKA-15983:
---

 Summary: Kafka-acls should return authorization already done if 
repeat work is issued
 Key: KAFKA-15983
 URL: https://issues.apache.org/jira/browse/KAFKA-15983
 Project: Kafka
  Issue Type: Improvement
  Components: security
Affects Versions: 3.6.0
Reporter: Chen He


kafka-acls.sh cmd will always issue normal operation for a cmd if customer 
already authorized a user. It should reports something like "user {} already 
authorized with {} resources" instead of do it again and again. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15208: Upgrade Jackson dependencies to version 2.15.3 [kafka]

2023-12-06 Thread via GitHub


bmscomp commented on PR #13662:
URL: https://github.com/apache/kafka/pull/13662#issuecomment-1843889093

   @mimaison Done! 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1418129061


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) {
 }
 
 void cleanup() {

Review Comment:
   really not much to do when shutting down the network thread - we will try 
one more time to send the unsent and poll the network client to make sure all 
requests and sent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1418128177


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1024,15 +1037,14 @@ private void close(Duration timeout, boolean 
swallowException) {
 final Timer closeTimer = time.timer(timeout);
 clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
 closeTimer.update();
-
+// Prepare shutting down the network thread
+prepareShutdown(closeTimer, firstException);

Review Comment:
   Completing the tasks before shutting down the network thread. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-12-06 Thread via GitHub


mdedetrich commented on PR #12728:
URL: https://github.com/apache/kafka/pull/12728#issuecomment-1843872683

   > @mdedetrich Thanks, the changes look good to go. But the build is failing 
because there are new tests on trunk that still use EasyMock/PowerMock. Can you 
rebase on the latest from trunk?
   
   Yup doing so now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-06 Thread via GitHub


hachikuji commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1418127368


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -941,39 +851,209 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
-  private def partitionEntriesForVerification(verificationGuards: 
mutable.Map[TopicPartition, VerificationGuard],
-  entriesPerPartition: 
Map[TopicPartition, MemoryRecords],
-  verifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-  unverifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-  errorEntries: 
mutable.Map[TopicPartition, Errors]): Unit= {
-val transactionalProducerIds = mutable.HashSet[Long]()
+  /**
+   * Handles the produce request by starting any transactional verification 
before appending.
+   *
+   * @param timeout   maximum time we will wait to append 
before returning
+   * @param requiredAcks  number of replicas who must 
acknowledge the append before sending the response
+   * @param internalTopicsAllowed boolean indicating whether internal 
topics can be appended to
+   * @param originsource of the append request (ie, 
client, replication, coordinator)
+   * @param entriesPerPartition   the records per partition to be 
appended
+   * @param responseCallback  callback for sending the response
+   * @param delayedProduceLocklock for the delayed actions
+   * @param recordValidationStatsCallback callback for updating stats on 
record conversions
+   * @param requestLocal  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *  thread calling this method
+   * @param actionQueue   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   * @param verificationGuardsthe mapping from topic partition to 
verification guards if transaction verification is used
+   */
+  def handleProduceAppend(timeout: Long,
+  requiredAcks: Short,
+  internalTopicsAllowed: Boolean,
+  origin: AppendOrigin,
+  transactionalId: String,
+  entriesPerPartition: Map[TopicPartition, 
MemoryRecords],
+  responseCallback: Map[TopicPartition, 
PartitionResponse] => Unit,
+  recordValidationStatsCallback: Map[TopicPartition, 
RecordValidationStats] => Unit = _ => (),
+  requestLocal: RequestLocal = RequestLocal.NoCaching,
+  actionQueue: ActionQueue = this.defaultActionQueue): 
Unit = {
+
+val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
+val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
 entriesPerPartition.foreach { case (topicPartition, records) =>
-  try {
-// Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
-val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
-transactionalBatches.foreach(batch => 
transactionalProducerIds.add(batch.producerId))
-
-if (transactionalBatches.nonEmpty) {
-  // We return VerificationGuard if the partition needs to be 
verified. If no state is present, no need to verify.
-  val firstBatch = records.firstBatch
-  val verificationGuard = 
getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId,
 firstBatch.baseSequence, firstBatch.producerEpoch)
-  if (verificationGuard != VerificationGuard.SENTINEL) {
-verificationGuards.put(topicPartition, verificationGuard)
-unverifiedEntries.put(topicPartition, records)
-  } else
-verifiedEntries.put(topicPartition, records)
-} else {
-  // If there is no producer ID or transactional records in the 
batches, no need to verify.
-  verifiedEntries.put(topicPartition, records)
-}
-  } catch {
-case e: Exception => errorEntries.put(topicPartition, 
Errors.forException(e))
-  }
+  // Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
+  val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
+  transactionalBatches.foreach(batch => 
transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
+  if (!transactionalBatches.isEmpty) 

Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1843871662

   Hi @kirktrue - I rewrote the previous PR based on your feedback.  I thought 
driving the close via event is a better and clearer pattern, so thanks for the 
suggestions.  Would you have time to take a look at this PR?
   
   @lucasbru - Thanks for reviewing the PR - I've decided according to your 
suggestion to use Kirk's approach to close the consumer.  Let me know what do 
you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15215: docs for KIP-954 [kafka]

2023-12-06 Thread via GitHub


agavra opened a new pull request, #14949:
URL: https://github.com/apache/kafka/pull/14949

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-06 Thread via GitHub


philipnee commented on PR #14920:
URL: https://github.com/apache/kafka/pull/14920#issuecomment-1843868318

   Thanks @lucasbru and @kirktrue - i'm closing this one and reopening another 
one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-06 Thread via GitHub


philipnee closed pull request #14920: KAFKA-15696: Refactor AsyncConsumer close 
procedure
URL: https://github.com/apache/kafka/pull/14920


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


mjsax opened a new pull request, #14948:
URL: https://github.com/apache/kafka/pull/14948

   Part of KIP-714.
   
   Adds support to expose producer client instance id. This PR only adds 
support for thread producer and state-updater disabled.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1418122441


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1050,6 +1062,81 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer, final 
AtomicReference firstException) {
+if (!groupMetadata.isPresent())
+return;
+maybeAutoCommitSync(timer, firstException);
+timer.update();
+waitOnEventCompletion(new 
ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, 
timer.remainingMs()), timer, firstException);
+maybeInvokeCommitCallbacks();
+maybeRevokePartitions(timer, firstException);
+waitOnEventCompletion(new 
ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, 
timer.remainingMs()), timer, firstException);
+}
+
+private void waitOnEventCompletion(final ConsumerCloseApplicationEvent 
event,
+   final Timer timer,
+   final AtomicReference 
firstException) {
+try {
+applicationEventHandler.addAndGet(event, timer);
+} catch (TimeoutException e) {

Review Comment:
   We don't really throw timeout exceptions during closing because if user 
tries to close with 0 duration then all ops will be timedout.  The current 
implementation just polls, but since we cannot directly polls the client, we 
need to either wait till the future is completed or times out and keep going.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-06 Thread via GitHub


aliehsaeedii commented on PR #14626:
URL: https://github.com/apache/kafka/pull/14626#issuecomment-1843861522

   > @aliehsaeedii @mjsax The last build has many failed tests: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14626/26/tests.
 I also see them in other builds now. Are they related to this PR?
   
   Thanks @dajac. We will merge our fixing PR after Jenkins builds.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-06 Thread via GitHub


jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1418097344


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -349,146 +468,68 @@ class GroupMetadataManager(brokerId: Int,
consumerId: String,
offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
responseCallback: immutable.Map[TopicIdPartition, Errors] 
=> Unit,
-   transactionalId: String = null,
producerId: Long = RecordBatch.NO_PRODUCER_ID,
producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
requestLocal: RequestLocal = RequestLocal.NoCaching): Unit 
= {
-// first filter out partitions with offset metadata size exceeding limit
-val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
-  validateOffsetMetadataLength(offsetAndMetadata.metadata)
-}
-
 group.inLock {
   if (!group.hasReceivedConsistentOffsetCommits)
 warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has 
received offset commits from consumers as well " +
   s"as transactional producers. Mixing both types of offset commits 
will generally result in surprises and " +
   s"should be avoided.")
 }
 
-val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
-// construct the message set to append
+val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
+  validateOffsetMetadataLength(offsetAndMetadata.metadata)
+}
 if (filteredOffsetMetadata.isEmpty) {
   // compute the final error codes for the commit response
   val commitStatus = offsetMetadata.map { case (k, _) => k -> 
Errors.OFFSET_METADATA_TOO_LARGE }
   responseCallback(commitStatus)
-} else {
-  getMagic(partitionFor(group.groupId)) match {
-case Some(magicValue) =>
-  // We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
-  val timestampType = TimestampType.CREATE_TIME
-  val timestamp = time.milliseconds()
-
-  val records = filteredOffsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
-val key = GroupMetadataManager.offsetCommitKey(group.groupId, 
topicIdPartition.topicPartition)
-val value = 
GroupMetadataManager.offsetCommitValue(offsetAndMetadata, 
interBrokerProtocolVersion)
-new SimpleRecord(timestamp, key, value)
-  }
-  val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
-  val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
compressionType, records.asJava))
-
-  if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
-throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting 
to make a transaction offset commit with an invalid magic: " + magicValue)
-
-  val builder = MemoryRecords.builder(buffer, magicValue, 
compressionType, timestampType, 0L, time.milliseconds(),
-producerId, producerEpoch, 0, isTxnOffsetCommit, 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
-
-  records.foreach(builder.append)
-  val entries = Map(offsetTopicPartition -> builder.build())
-
-  // set the callback function to insert offsets into cache after log 
append completed
-  def putCacheCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]): Unit = {
-// the append response should only contain the topics partition
-if (responseStatus.size != 1 || 
!responseStatus.contains(offsetTopicPartition))
-  throw new IllegalStateException("Append status %s should only 
have one partition %s"
-.format(responseStatus, offsetTopicPartition))
-
-// construct the commit response status and insert
-// the offset and metadata to cache if the append status has no 
error
-val status = responseStatus(offsetTopicPartition)
-
-val responseError = group.inLock {
-  if (status.error == Errors.NONE) {
-if (!group.is(Dead)) {
-  filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
-if (isTxnOffsetCommit)
-  group.onTxnOffsetCommitAppend(producerId, 
topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), 
offsetAndMetadata))
-else
-  group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
-  }
-}
-
-// Record the number of offsets committed to the log
-offsetCommitsSensor.record(records.size)
-
-   

Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-06 Thread via GitHub


jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1418096753


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -339,156 +340,183 @@ class GroupMetadataManager(brokerId: Int,
   delayedProduceLock = Some(group.lock),
   responseCallback = callback,
   requestLocal = requestLocal,
-  transactionalId = transactionalId)
+  verificationGuards = verificationGuards,
+  preAppendErrors = preAppendErrors)
+  }
+
+  def generateOffsetRecords(magicValue: Byte,
+isTxnOffsetCommit: Boolean,
+groupId: String,
+offsetTopicPartition: TopicPartition,
+filteredOffsetMetadata: Map[TopicIdPartition, 
OffsetAndMetadata],
+producerId: Long,
+producerEpoch: Short): Map[TopicPartition, 
MemoryRecords] = {
+  // We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
+  val timestampType = TimestampType.CREATE_TIME
+  val timestamp = time.milliseconds()
+
+  val records = filteredOffsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
+val key = GroupMetadataManager.offsetCommitKey(groupId, 
topicIdPartition.topicPartition)
+val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, 
interBrokerProtocolVersion)
+new SimpleRecord(timestamp, key, value)
+  }
+  val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
compressionType, records.asJava))
+
+  if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
+throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to 
make a transaction offset commit with an invalid magic: " + magicValue)
+
+  val builder = MemoryRecords.builder(buffer, magicValue, compressionType, 
timestampType, 0L, time.milliseconds(),
+producerId, producerEpoch, 0, isTxnOffsetCommit, 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
+
+  records.foreach(builder.append)
+  Map(offsetTopicPartition -> builder.build())
+  }
+
+  def createPutCacheCallback(isTxnOffsetCommit: Boolean,
+ group: GroupMetadata,
+ consumerId: String,
+ offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
+ filteredOffsetMetadata: Map[TopicIdPartition, 
OffsetAndMetadata],
+ responseCallback: immutable.Map[TopicIdPartition, 
Errors] => Unit,
+ producerId: Long = RecordBatch.NO_PRODUCER_ID,
+ records: Map[TopicPartition, MemoryRecords],
+ preAppendErrors: Map[TopicPartition, 
LogAppendResult] = Map.empty): Map[TopicPartition, PartitionResponse] => Unit = 
{
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
+// set the callback function to insert offsets into cache after log append 
completed
+def putCacheCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]): Unit = {
+  // the append response should only contain the topics partition
+  if (responseStatus.size != 1 || 
!responseStatus.contains(offsetTopicPartition))
+throw new IllegalStateException("Append status %s should only have one 
partition %s"
+  .format(responseStatus, offsetTopicPartition))
+
+  // construct the commit response status and insert
+  // the offset and metadata to cache if the append status has no error
+  val status = responseStatus(offsetTopicPartition)
+
+  val responseError = group.inLock {
+if (status.error == Errors.NONE) {
+  if (!group.is(Dead)) {
+filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
+  if (isTxnOffsetCommit)
+group.onTxnOffsetCommitAppend(producerId, topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+  else
+group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+}
+  }
+
+  // Record the number of offsets committed to the log
+  offsetCommitsSensor.record(records.size)
+
+  Errors.NONE
+} else {
+  if (!group.is(Dead)) {
+if (!group.hasPendingOffsetCommitsFromProducer(producerId))
+  removeProducerGroup(producerId, group.groupId)
+filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
+  if (isTxnOffsetCommit)
+group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
+  else
+

Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-06 Thread via GitHub


jolshan commented on PR #14774:
URL: https://github.com/apache/kafka/pull/14774#issuecomment-1843841126

   @junrao Thanks for taking a look. I was just rewriting the code to make this 
clearer. I will take a look at @artemlivshits and your questions about locking 
now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-06 Thread via GitHub


jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1418093193


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -708,23 +708,40 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 }
 
-if (authorizedRequestInfo.isEmpty)
-  sendResponseCallback(Map.empty)
-else {
-  val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
+val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
+val transactionVerificationEntries = new 
ReplicaManager.TransactionVerificationEntries
 
-  // call the replica manager to append messages to the replicas
+def postVerificationCallback(newRequestLocal: RequestLocal)

Review Comment:
   The new code fixes this.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -941,39 +857,129 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
-  private def partitionEntriesForVerification(verificationGuards: 
mutable.Map[TopicPartition, VerificationGuard],
-  entriesPerPartition: 
Map[TopicPartition, MemoryRecords],
-  verifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-  unverifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-  errorEntries: 
mutable.Map[TopicPartition, Errors]): Unit= {
+  private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition, 
MemoryRecords],
+ responseCallback: 
Map[TopicPartition, PartitionResponse] => Unit): Unit = {
+// If required.acks is outside accepted range, something is wrong with the 
client
+// Just return an error and don't handle the request at all
+val responseStatus = entries.map { case (topicPartition, _) =>
+  topicPartition -> new PartitionResponse(
+Errors.INVALID_REQUIRED_ACKS,
+LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
+RecordBatch.NO_TIMESTAMP,
+LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
+  )
+}
+responseCallback(responseStatus)
+  }
+
+  /**
+   * Apply the postVerificationCallback asynchronously only after verifying 
the partitions have been added to the transaction.
+   * The postVerificationCallback takes the arguments of the requestLocal for 
the thread that will be doing the append as
+   * well as a mapping of topic partitions to LogAppendResult for the 
partitions that saw errors when verifying.
+   *
+   * This method will start the verification process for all the 
topicPartitions in entriesPerPartition and supply the
+   * postVerificationCallback to be run on a request handler thread when the 
response is received.
+   *
+   * @param entriesPerPartitionthe records per partition to be 
appended and therefore need verification
+   * @param transactionVerificationEntries the object that will store the 
entries to verify, the errors, and the verification guards
+   * @param transactionalIdthe id for the transaction
+   * @param requestLocal   container for the stateful 
instances scoped to this request -- this must correspond to the
+   *   thread calling this method
+   * @param postVerificationCallback   the method to be called when 
verification completes and the verification errors
+   *   and the thread's RequestLocal are 
supplied
+   */
+  def appendRecordsWithTransactionVerification(entriesPerPartition: 
Map[TopicPartition, MemoryRecords],
+   transactionVerificationEntries: 
TransactionVerificationEntries,
+   transactionalId: String,
+   requestLocal: RequestLocal,
+   postVerificationCallback: 
RequestLocal => Map[TopicPartition, LogAppendResult] => Unit): Unit = {
+if (transactionalId != null && 
config.transactionPartitionVerificationEnable && 
addPartitionsToTxnManager.isDefined)
+  partitionEntriesForVerification(transactionVerificationEntries, 
entriesPerPartition)
+
+val onVerificationComplete: (RequestLocal, Map[TopicPartition, Errors]) => 
Unit =
+  executePostVerificationCallback(
+transactionVerificationEntries,
+postVerificationCallback,
+  )
+
+if (transactionVerificationEntries.unverified.isEmpty) {
+  onVerificationComplete(requestLocal, 
transactionVerificationEntries.errors.toMap)
+} else {
+  // For unverified entries, send a request to verify. When verified, the 
append process will proceed via the callback.
+  // We verify above that all partitions use the same producer ID.
+  val batchInfo = 

Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-06 Thread via GitHub


soarez commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1843813913

   @junrao: Thanks for revewing this. It seems there are currently some issues 
with faling tests. See 
https://github.com/apache/kafka/pull/14838#issuecomment-1843693525 
   The test failures for this PR don't seem to be related to the changes, but 
to be sure we can also wait a bit longer and see if the state of the build 
improves. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: fix unit test [kafka]

2023-12-06 Thread via GitHub


mjsax commented on code in PR #14947:
URL: https://github.com/apache/kafka/pull/14947#discussion_r1418059234


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java:
##
@@ -230,13 +230,19 @@ public void shouldFindAll() {
 
segmentValue.findAll(testCase.records.get(testCase.records.size() - 
1).timestamp, testCase.records.get(0).timestamp);
 
 int i = 0;
+int index = 0;
 for (final TestRecord expectedRecord : testCase.records) {
-final long expectedValidTo = i == 0 ? testCase.nextTimestamp : 
testCase.records.get(i - 1).timestamp;
-assertThat(results.get(i).index(), equalTo(i));
+if (expectedRecord.value == null) { // should not return 
tombstones
+index++;
+continue;
+}
+final long expectedValidTo = index== 0 ? 
testCase.nextTimestamp : testCase.records.get(index - 1).timestamp;

Review Comment:
   ```suggestion
   final long expectedValidTo = index == 0 ? 
testCase.nextTimestamp : testCase.records.get(index - 1).timestamp;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


wcarlson5 commented on code in PR #14935:
URL: https://github.com/apache/kafka/pull/14935#discussion_r1418058902


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -1477,6 +1528,27 @@ public Object getStateLock() {
 return stateLock;
 }
 
+public Map> consumerClientInstanceIds(final 
Duration timeout) {

Review Comment:
   and here



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -454,4 +485,19 @@ public void shutdown() {
 public Map consumerMetrics() {
 return Collections.unmodifiableMap(globalConsumer.metrics());
 }
+
+public KafkaFuture globalConsumerInstanceId(final Duration timeout) {

Review Comment:
   Same comment here if you don't mind



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


wcarlson5 commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1418054625


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1802,7 +1805,7 @@ protected int processStreamThread(final 
Consumer consumer) {
  * @throws TimeoutException Indicates that a request timed out.
  * @throws StreamsException For any other error that might occur.
  */
-public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+public synchronized ClientInstanceIds clientInstanceIds(final Duration 
timeout) {

Review Comment:
   I think this should take care of most of the threading issues, but it does 
leave it easy to introduce bugs in the future



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -1477,6 +1528,27 @@ public Object getStateLock() {
 return stateLock;
 }
 
+public Map> consumerClientInstanceIds(final 
Duration timeout) {

Review Comment:
   Can we add a comment here that this isn't thread safe? Just for the next 
person who tries to use it an introduces a nasty race condition. The fact that 
it returns futures might make people think it is



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -713,6 +727,48 @@ boolean runLoop() {
 return true;
 }
 
+// visible for testing
+void maybeGetClientInstanceIds() {

Review Comment:
   That is actually okay as each time the call is made it overwrites 
`mainConsumerInstanceIdFuture` anyways so there is only ever one future to 
complete.
   
   Not that this isn't an issue but with the caller being synchronized it won't 
be for this feature



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-12-06 Thread via GitHub


rondagostino commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1418016489


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -76,13 +83,69 @@ class ReplicaAlterLogDirsThread(name: String,
 futureLog.updateHighWatermark(partitionData.highWatermark)
 futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, 
LogStartOffsetIncrementReason.LeaderOffsetIncremented)
 
-if (partition.maybeReplaceCurrentWithFutureReplica())
-  removePartitions(Set(topicPartition))
+directoryEventHandler match {
+  case DirectoryEventHandler.NOOP =>
+if (partition.maybeReplaceCurrentWithFutureReplica())
+  removePartitions(Set(topicPartition))
+  case _ =>
+maybePromoteFutureReplica(topicPartition, partition)
+}
 
 quota.record(records.sizeInBytes)
 logAppendInfo
   }
 
+  override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
+// Schedule assignment request to revert any queued request before 
cancelling
+for {
+  topicPartition <- topicPartitions
+  partitionState <- partitionAssignmentRequestState(topicPartition)
+  if partitionState == QUEUED
+  partition = replicaMgr.getPartitionOrException(topicPartition)
+  topicId <- partition.topicId
+  directoryId <- partition.logDirectoryId()
+  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
+} yield directoryEventHandler.handleAssignment(topicIdPartition, 
directoryId)
+
+super.removePartitions(topicPartitions)
+  }
+
+  // Visible for testing
+  def updatedAssignmentRequestState(topicPartition: TopicPartition)(state: 
ReplicaAlterLogDirsThread.DirectoryEventRequestState): Unit = {
+assignmentRequestStates.put(topicPartition, state)
+  }
+  private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {

Review Comment:
   nit: missing newline separator between defs



##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -76,13 +83,69 @@ class ReplicaAlterLogDirsThread(name: String,
 futureLog.updateHighWatermark(partitionData.highWatermark)
 futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, 
LogStartOffsetIncrementReason.LeaderOffsetIncremented)
 
-if (partition.maybeReplaceCurrentWithFutureReplica())
-  removePartitions(Set(topicPartition))
+directoryEventHandler match {
+  case DirectoryEventHandler.NOOP =>
+if (partition.maybeReplaceCurrentWithFutureReplica())
+  removePartitions(Set(topicPartition))
+  case _ =>
+maybePromoteFutureReplica(topicPartition, partition)
+}
 
 quota.record(records.sizeInBytes)
 logAppendInfo
   }
 
+  override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
+// Schedule assignment request to revert any queued request before 
cancelling
+for {
+  topicPartition <- topicPartitions
+  partitionState <- partitionAssignmentRequestState(topicPartition)
+  if partitionState == QUEUED
+  partition = replicaMgr.getPartitionOrException(topicPartition)
+  topicId <- partition.topicId
+  directoryId <- partition.logDirectoryId()
+  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
+} yield directoryEventHandler.handleAssignment(topicIdPartition, 
directoryId)

Review Comment:
   Seems odd to use `yield` when `directoryEventHandler.handleAssignment()` is 
a void method.  Is there a reason to use `yield`?



##
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala:
##
@@ -268,6 +268,200 @@ class ReplicaAlterLogDirsThreadTest {
 assertEquals(0, thread.partitionCount)
   }
 
+  def updateAssignmentRequestState(thread: ReplicaAlterLogDirsThread, 
partitionId:Int, newState: 
ReplicaAlterLogDirsThread.DirectoryEventRequestState) = {
+topicNames.get(topicId).map(topicName => {
+  thread.updatedAssignmentRequestState(new TopicPartition(topicName, 
partitionId))(newState)
+})
+  }
+  @Test

Review Comment:
   nit: missing newline separator



##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -76,13 +83,69 @@ class ReplicaAlterLogDirsThread(name: String,
 futureLog.updateHighWatermark(partitionData.highWatermark)
 futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, 
LogStartOffsetIncrementReason.LeaderOffsetIncremented)
 
-if (partition.maybeReplaceCurrentWithFutureReplica())
-  removePartitions(Set(topicPartition))
+directoryEventHandler match {
+  case DirectoryEventHandler.NOOP =>
+if (partition.maybeReplaceCurrentWithFutureReplica())
+  removePartitions(Set(topicPartition))
+  case _ =>
+maybePromoteFutureReplica(topicPartition, partition)
+

Re: [PR] KAFKA-15978: Update member information on HB response [kafka]

2023-12-06 Thread via GitHub


AndrewJSchofield commented on PR #14945:
URL: https://github.com/apache/kafka/pull/14945#issuecomment-1843752569

   Investigating failure of new test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-06 Thread via GitHub


rondagostino merged PR #14838:
URL: https://github.com/apache/kafka/pull/14838


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15982) Move GenericGroup state metrics to `GroupCoordinatorMetricsShard`

2023-12-06 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-15982:


 Summary: Move GenericGroup state metrics to 
`GroupCoordinatorMetricsShard`
 Key: KAFKA-15982
 URL: https://issues.apache.org/jira/browse/KAFKA-15982
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim
Assignee: Jeff Kim


Currently, the generic group state metrics exist inside 
`GroupCoordinatorMetrics` as global metrics. This causes issues as during 
unload, we need to traverse through all groups and decrement the group size 
counters. 

Move the generic group state metrics to the shard level so that when a 
partition is unloaded we automatically remove the counter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-06 Thread via GitHub


junrao commented on PR #14774:
URL: https://github.com/apache/kafka/pull/14774#issuecomment-1843729483

   @jolshan : Thanks for the PR. Should we fix the `append` call in 
`CoordinatorRuntime` (https://github.com/apache/kafka/pull/14705) too? There, a 
partition level lock in `CoordinatorRuntime` is held while checking/updating 
the coordinator state and calling `append`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-06 Thread via GitHub


cmccabe commented on PR #14838:
URL: https://github.com/apache/kafka/pull/14838#issuecomment-1843728301

   @rondagostino : I think we can merge this now because I don't think it's 
adding more failures. I do think we need to do another sweep looking for people 
calling `Exit.exit`, but that doesn't need to block here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15981) only record GenericGroup state metrics once per group during load

2023-12-06 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-15981:


 Summary: only record GenericGroup state metrics once per group 
during load
 Key: KAFKA-15981
 URL: https://issues.apache.org/jira/browse/KAFKA-15981
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim
Assignee: Jeff Kim


Currently, we increment generic group metrics whenever we create a new 
GenericGroup object while we load the partition. This is incorrect as the 
partition may contain several records for the same group if in the active 
segment or if the segment has not yet been compacted.

Instead, increment the metrics for each group we successfully loaded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-12-06 Thread via GitHub


gharris1727 commented on code in PR #14562:
URL: https://github.com/apache/kafka/pull/14562#discussion_r1418003736


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -771,6 +778,104 @@ private Map 
defaultSinkConnectorProps(String topics) {
 return props;
 }
 
+@Test
+public void testRequestTimeouts() throws Exception {
+final String configTopic = "test-request-timeout-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+// Workaround for KAFKA-15676, which can cause the scheduled rebalance 
delay to
+// be spuriously triggered after the group coordinator for a Connect 
cluster is bounced
+// Set to 1 instead of 0 as another workaround for KAFKA-15693, which 
can cause
+// connectors and tasks to be unassigned indefinitely if the scheduled 
rebalance delay
+// is set to 0
+workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1");
+connect = connectBuilder
+.numBrokers(1)
+.numWorkers(1)
+.build();
+connect.start();
+connect.assertions().assertAtLeastNumWorkersAreUp(1,
+"Worker did not start in time");
+
+Map connectorConfig1 = 
defaultSourceConnectorProps(TOPIC_NAME);
+Map connectorConfig2 = new HashMap<>(connectorConfig1);
+connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 
1));
+
+// Create a connector to ensure that the worker has completed startup
+log.info("Creating initial connector");
+connect.configureConnector(CONNECTOR_NAME, connectorConfig1);
+connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start 
in time"
+);
+
+// Bring down Kafka, which should cause some REST requests to fail
+log.info("Stopping Kafka cluster");
+connect.kafka().stopOnlyKafka();
+// Allow for the workers to discover that the coordinator is 
unavailable, wait is
+// heartbeat timeout * 2 + 4sec
+Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+connect.requestTimeout(5_000);
+// Try to reconfigure the connector, which should fail with a timeout 
error
+log.info("Trying to reconfigure connector while Kafka cluster is 
down");
+ConnectRestException e = assertThrows(
+ConnectRestException.class,
+() -> connect.configureConnector(CONNECTOR_NAME, 
connectorConfig2)
+);
+assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode());
+assertNotNull(e.getMessage());
+assertTrue(
+"Message '" + e.getMessage() + "' does not match expected 
format",
+e.getMessage().contains("Request timed out. The worker is 
currently flushing updates to the status topic")

Review Comment:
   Ah I see. So the `polling the group coordinator for up to ... or until 
interrupted` stage is only temporary and `flushing updates to the status topic` 
is permanent, so it'll always eventually get stuck on this stage.
   
   Do you think it's possible for the preceeding `Thread.sleep()` to cause a 
flake here, if the worker is in the "polling the group coordinator" stage for 
too long? Perhaps we could replace the sleep with a wait-until-condition that 
repeatedly makes the request until the flushing status store error appears.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-06 Thread via GitHub


rondagostino commented on PR #14838:
URL: https://github.com/apache/kafka/pull/14838#issuecomment-1843693525

   Looking at the [trunk build 
jobs](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/activity?branch=trunk):
   
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2432/tests
 completed 6 days ago with `25 tests have failed.  There are 21 new tests 
failing, 4 existing failing `
   
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2437/tests
 completed 5 days ago with `60 tests have failed.  There are 36 new tests 
failing, 24 existing failing`
   
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2442/tests
 completed 3 days ago with `30 tests have failed.  There are 14 new tests 
failing, 16 existing failing`
   
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2444/tests
 completed 2 days ago with `53 tests have failed.  There are 37 new tests 
failing, 16 existing failing`
   
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2445/tests
 completed 2 days ago with `92 tests have failed.  There are 17 new tests 
failing, 75 existing failing`
   
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2452/tests
 completed 11 hours ago with `111 tests have failed.  There are 49 new tests 
failing, 62 existing failing`
   
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2453/tests
 completed 7 hours ago with `24 tests have failed.  There are 24 new tests 
failing, 0 existing failing`
   
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2454/tests
 completed 1 hour ago with `101 tests have failed.  There are 39 new tests 
failing, 62 existing failing`
   
   It feels to me like this is just flakiness -- either in the test or in the 
infrastructure.  There's no rhyme or reason to it.  I am inclined to commit 
this PR.  WDYT @cmccabe?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-06 Thread via GitHub


rondagostino commented on PR #14838:
URL: https://github.com/apache/kafka/pull/14838#issuecomment-1843668221

   I think the issue is not with our PR.  Here's another [PR that is basically 
a 2-line change in a single test](https://github.com/apache/kafka/pull/14944) 
and that has [144 test 
failures!](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14944/2/tests).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-06 Thread via GitHub


rondagostino commented on PR #14838:
URL: https://github.com/apache/kafka/pull/14838#issuecomment-1843665517

   [Latest builds for JDKs 8 and 21 
failed](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14838/16/pipeline/).
  Restarted the build: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14838/17/pipeline
   
   So frustrating.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15347: fix unit test [kafka]

2023-12-06 Thread via GitHub


aliehsaeedii opened a new pull request, #14947:
URL: https://github.com/apache/kafka/pull/14947

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-06 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1417911671


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -364,10 +377,30 @@ class BrokerLifecycleManager(
 }
 _channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
   new BrokerRegistrationResponseHandler())
+communicationInFlight = true
   }
 
+  // the response handler is not invoked from the event handler thread,
+  // so it is not safe to update state here, instead, schedule an event
+  // to continue handling the response on the event handler thread
   private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))

Review Comment:
   @cmccabe : Do you have concerns on the usage of `prepend` here?



##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -166,6 +166,19 @@ class BrokerLifecycleManager(
*/
   private var registered = false
 
+  /**
+   * True if a request has been sent and a response or timeout has not yet 
been processed.
+   * This variable can only be read or written from the event queue thread.
+   */
+  private var communicationInFlight = false

Review Comment:
   Yes, I agree that it's not straightforward. We can just leave the code as it 
is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1417650715


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -778,18 +783,23 @@ public void testGroupIdNull() {
 
 @Test
 public void testGroupIdNotNullAndValid() {
+// close the default consumer
+shutDown();

Review Comment:
   The test spins up another consumer so we should shutdown the BeforeEach one. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-06 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1843583099

   Hi @lianetm @cadonna @lucasbru - Thanks for taking the time reviewing my PR. 
I addressed your comments and left one for @cadonna.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1417898446


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -399,14 +400,45 @@ public void testHeartbeatState() {
 new ConsumerGroupHeartbeatResponseData.Assignment();
 
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
 ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-.setMemberId(memberId)
-.setMemberEpoch(1)
-.setAssignment(assignmentTopic1));
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(memberId)
+.setMemberEpoch(1)
+.setAssignment(assignmentTopic1));
 membershipManager.onHeartbeatResponseReceived(rs1.data());
 assertEquals(MemberState.RECONCILING, membershipManager.state());
 }
 
+@Test
+public void testEnsureLeaveGroupWhenPollTimerExpires() {
+membershipManager.transitionToJoining();

Review Comment:
   you are 100% correct.  I submit a patch for this specific test.  Thanks for 
catching this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15978: Update member information on HB response [kafka]

2023-12-06 Thread via GitHub


lianetm commented on code in PR #14945:
URL: https://github.com/apache/kafka/pull/14945#discussion_r1417895767


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -224,6 +224,13 @@ boolean canAutoCommit() {
 return autoCommitState.isPresent() && 
!subscriptions.allConsumed().isEmpty();
 }
 
+/**
+ * Updates the member ID and epoch upon receiving 
ConsumerGroupHeartbeatResponse.
+ */
+void updateMemberInformation(String memberId, int memberEpoch) {
+groupState.generation = new GroupState.Generation(memberEpoch, 
memberId, null);
+}

Review Comment:
   Just for the record, with the fix in the commit PR the approach is having 
this CommitManager as a StateListener, that gets notified when epoch changes, 
so it gets the last value from the MembershipManager. I did the listener 
approach to avoid a circular dep (the MembershipMgr uses the CommitMgr to 
commit), and truly, this Commit does not need the whole MembershipMgr, it just 
needs to have the latest ID/epoch, so the listener seems appropiate. Anyways, 
let's have this conversation on the other PR when in review. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15978: Update member information on HB response [kafka]

2023-12-06 Thread via GitHub


lianetm commented on code in PR #14945:
URL: https://github.com/apache/kafka/pull/14945#discussion_r1417895767


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -224,6 +224,13 @@ boolean canAutoCommit() {
 return autoCommitState.isPresent() && 
!subscriptions.allConsumed().isEmpty();
 }
 
+/**
+ * Updates the member ID and epoch upon receiving 
ConsumerGroupHeartbeatResponse.
+ */
+void updateMemberInformation(String memberId, int memberEpoch) {
+groupState.generation = new GroupState.Generation(memberEpoch, 
memberId, null);
+}

Review Comment:
   Just for the record, with the fix in the commit PR the approach is having 
this CommitManager as a StateListener, that gets notified when epoch changes, 
so it gets the last value from the MembershipManager. I did the listener 
approach to avoid a circular dep (the MembershipMgr uses the CommitMgr to 
commit), and truly, this Commit does not need the whole MembershipMgr, it just 
needs to have the latest ID/epoch, so the listener seemed enough. Anyways, 
let's have this conversation on the other PR when in review. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15978: Update member information on HB response [kafka]

2023-12-06 Thread via GitHub


lianetm commented on code in PR #14945:
URL: https://github.com/apache/kafka/pull/14945#discussion_r1417892970


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -224,6 +224,13 @@ boolean canAutoCommit() {
 return autoCommitState.isPresent() && 
!subscriptions.allConsumed().isEmpty();
 }
 
+/**
+ * Updates the member ID and epoch upon receiving 
ConsumerGroupHeartbeatResponse.
+ */
+void updateMemberInformation(String memberId, int memberEpoch) {
+groupState.generation = new GroupState.Generation(memberEpoch, 
memberId, null);
+}

Review Comment:
   Yes, I'm on this and will come out in the commit PR, so if we want to 
unblock the e2e I'm ok with keeping this as temporary fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15978: Update member information on HB response [kafka]

2023-12-06 Thread via GitHub


dajac commented on code in PR #14945:
URL: https://github.com/apache/kafka/pull/14945#discussion_r1417891269


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -224,6 +224,13 @@ boolean canAutoCommit() {
 return autoCommitState.isPresent() && 
!subscriptions.allConsumed().isEmpty();
 }
 
+/**
+ * Updates the member ID and epoch upon receiving 
ConsumerGroupHeartbeatResponse.
+ */
+void updateMemberInformation(String memberId, int memberEpoch) {
+groupState.generation = new GroupState.Generation(memberEpoch, 
memberId, null);
+}

Review Comment:
   There are two other places where the member epoch is updated. Should we also 
call `updateMemberInformation` for those for completeness? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-06 Thread via GitHub


dajac commented on PR #14845:
URL: https://github.com/apache/kafka/pull/14845#issuecomment-1843565757

   It looks like the new failing tests are coming from 
https://github.com/apache/kafka/pull/14626.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2023-12-06 Thread via GitHub


nizhikov commented on code in PR #14471:
URL: https://github.com/apache/kafka/pull/14471#discussion_r1417877630


##
tools/src/main/java/org/apache/kafka/tools/consumergroup/CsvUtils.java:
##
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumergroup;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+public class CsvUtils {
+private final CsvMapper mapper = new CsvMapper();
+
+ObjectReader readerFor(Class clazz) {
+return mapper.readerFor(clazz).with(getSchema(clazz));
+}
+
+ObjectWriter writerFor(Class clazz) {
+return mapper.writerFor(clazz).with(getSchema(clazz));
+}
+
+private CsvSchema getSchema(Class clazz) {
+String[] fields;
+if (CsvRecordWithGroup.class == clazz)
+fields = CsvRecordWithGroup.FIELDS;
+else if (CsvRecordNoGroup.class == clazz)
+fields = CsvRecordNoGroup.FIELDS;
+else
+throw new IllegalStateException("Unhandled class " + clazz);
+
+return mapper.schemaFor(clazz).sortedBy(fields);
+}
+
+public interface CsvRecord {
+}
+
+public static class CsvRecordWithGroup implements CsvRecord {

Review Comment:
   fixed. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Support to update Async Consumer group member label (KIP-714) [kafka]

2023-12-06 Thread via GitHub


apoorvmittal10 commented on code in PR #14946:
URL: https://github.com/apache/kafka/pull/14946#discussion_r1417877859


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -321,6 +332,9 @@ public void 
onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData respo
 this.memberEpoch = response.memberEpoch();
 ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
 
+clientTelemetryReporter.ifPresent(reporter -> 
reporter.updateMetricsLabels(
+Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, 
memberId)));

Review Comment:
   @AndrewJSchofield I have added the check with comment, please see if it 
makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-06 Thread via GitHub


dajac commented on PR #14626:
URL: https://github.com/apache/kafka/pull/14626#issuecomment-1843551173

   @aliehsaeedii @mjsax The last build has many failed tests: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14626/26/tests.
 I also see them in other builds now. Are they related to this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15972) Add support to exclude labels in client telemetry

2023-12-06 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal resolved KAFKA-15972.
---
Resolution: Done

> Add support to exclude labels in client telemetry
> -
>
> Key: KAFKA-15972
> URL: https://issues.apache.org/jira/browse/KAFKA-15972
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>
> Some of the labels/tags which are present in metric should be skipped while 
> collecting telemetry as data might already be known to broker hence, we 
> should minimize the data transfer. One of such labels is client_id which is 
> already present in RequestContext hence broker can append that label prior 
> emitting metrics to telemetry backend. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15877) Support change of temporality in Java client

2023-12-06 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal resolved KAFKA-15877.
---
Resolution: Done

> Support change of temporality in Java client 
> -
>
> Key: KAFKA-15877
> URL: https://issues.apache.org/jira/browse/KAFKA-15877
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>
> Details: https://github.com/apache/kafka/pull/14620#discussion_r1401554867



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15684) Add support to describe all subscriptions through utility

2023-12-06 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15684.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

merged the PR to trunk.

> Add support to describe all subscriptions through utility
> -
>
> Key: KAFKA-15684
> URL: https://issues.apache.org/jira/browse/KAFKA-15684
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> Open PR to support client-metrics through kafka-configs.sh doesn't list all 
> subscriptions. The functionality is missing because of missing support to 
> list client subscription in config repository and admin client. This task 
> should provide a workaround to fetch all subscriptions from config repository 
> by adding a method in KRaftMetadataCache. Later a KIP might be needed to add 
> support in AdminClient.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15877) Support change of temporality in Java client

2023-12-06 Thread Apoorv Mittal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793896#comment-17793896
 ] 

Apoorv Mittal commented on KAFKA-15877:
---

Changes were committed as part of PR: https://github.com/apache/kafka/pull/14620

 

https://github.com/apache/kafka/pull/14620#issuecomment-1826401674

> Support change of temporality in Java client 
> -
>
> Key: KAFKA-15877
> URL: https://issues.apache.org/jira/browse/KAFKA-15877
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>
> Details: https://github.com/apache/kafka/pull/14620#discussion_r1401554867



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]

2023-12-06 Thread via GitHub


junrao merged PR #14933:
URL: https://github.com/apache/kafka/pull/14933


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15979) Add KIP-1001 CurrentControllerId metric

2023-12-06 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-15979:


 Summary: Add KIP-1001 CurrentControllerId metric
 Key: KAFKA-15979
 URL: https://issues.apache.org/jira/browse/KAFKA-15979
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15980) Add KIP-1001 CurrentControllerId metric

2023-12-06 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-15980:


 Summary: Add KIP-1001 CurrentControllerId metric
 Key: KAFKA-15980
 URL: https://issues.apache.org/jira/browse/KAFKA-15980
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]

2023-12-06 Thread via GitHub


apoorvmittal10 commented on PR #14933:
URL: https://github.com/apache/kafka/pull/14933#issuecomment-1843520738

   > @apoorvmittal10 : Thanks for the updated PR. Just a minor comment.
   
   @junrao Thanks for again reviewing the PR. I have added thoughts for the 
comment and verified same locally as well. I have also verified the build and 
tests which failed, are not related to PR changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Support to update Async Consumer group member label (KIP-714) [kafka]

2023-12-06 Thread via GitHub


apoorvmittal10 commented on code in PR #14946:
URL: https://github.com/apache/kafka/pull/14946#discussion_r1417849266


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -321,6 +332,9 @@ public void 
onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData respo
 this.memberEpoch = response.memberEpoch();
 ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
 
+clientTelemetryReporter.ifPresent(reporter -> 
reporter.updateMetricsLabels(
+Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, 
memberId)));

Review Comment:
   Rechecked the code and it seems the placement of update code is right but I 
can optimize with the check that there occurred a change in `memberId` prior 
updating it. Making the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15816) Typos in tests leak network sockets

2023-12-06 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15816:

Description: 
There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients: [https://github.com/apache/kafka/pull/14750] (DONE)
 * NioEchoServer
 * KafkaConsumerTest
 * KafkaProducerTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest
 * SaslAuthenticatorTest

Core: [https://github.com/apache/kafka/pull/14754] 
 * MiniKdc
 * GssapiAuthenticationTest
 * MirrorMakerIntegrationTest
 * SocketServerTest
 * EpochDrivenReplicationProtocolAcceptanceTest
 * LeaderEpochIntegrationTest

Trogdor: [https://github.com/apache/kafka/pull/14771] 
 * AgentTest

Mirror: [https://github.com/apache/kafka/pull/14761] (DONE)
 * DedicatedMirrorIntegrationTest
 * MirrorConnectorsIntegrationTest
 * MirrorConnectorsWithCustomForwardingAdminIntegrationTest

Runtime: [https://github.com/apache/kafka/pull/14764] 
 * ConnectorTopicsIntegrationTest
 * ExactlyOnceSourceIntegrationTest
 * WorkerTest
 * WorkerGroupMemberTest

Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
 * IQv2IntegrationTest
 * MetricsReporterIntegrationTest
 * NamedTopologyIntegrationTest
 * PurgeRepartitionTopicIntegrationTest

These can be addressed by just fixing the tests.

  was:
There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients: [https://github.com/apache/kafka/pull/14750] (DONE)
 * NioEchoServer
 * KafkaConsumerTest
 * KafkaProducerTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest
 * SaslAuthenticatorTest

Core: [https://github.com/apache/kafka/pull/14754] 
 * MiniKdc
 * GssapiAuthenticationTest
 * MirrorMakerIntegrationTest
 * SocketServerTest
 * EpochDrivenReplicationProtocolAcceptanceTest
 * LeaderEpochIntegrationTest

Trogdor: [https://github.com/apache/kafka/pull/14771] 
 * AgentTest

Mirror: [https://github.com/apache/kafka/pull/14761] 
 * DedicatedMirrorIntegrationTest
 * MirrorConnectorsIntegrationTest
 * MirrorConnectorsWithCustomForwardingAdminIntegrationTest

Runtime: [https://github.com/apache/kafka/pull/14764] 
 * ConnectorTopicsIntegrationTest
 * ExactlyOnceSourceIntegrationTest
 * WorkerTest
 * WorkerGroupMemberTest

Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
 * IQv2IntegrationTest
 * MetricsReporterIntegrationTest
 * NamedTopologyIntegrationTest
 * PurgeRepartitionTopicIntegrationTest

These can be addressed by just fixing the tests.


> Typos in tests leak network sockets
> ---
>
> Key: KAFKA-15816
> URL: https://issues.apache.org/jira/browse/KAFKA-15816
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> There are a few tests which leak network sockets due to small typos in the 
> tests themselves.
> Clients: [https://github.com/apache/kafka/pull/14750] (DONE)
>  * NioEchoServer
>  * KafkaConsumerTest
>  * KafkaProducerTest
>  * SelectorTest
>  * SslTransportLayerTest
>  * SslTransportTls12Tls13Test
>  * SslVersionsTransportLayerTest
>  * SaslAuthenticatorTest
> Core: [https://github.com/apache/kafka/pull/14754] 
>  * MiniKdc
>  * GssapiAuthenticationTest
>  * MirrorMakerIntegrationTest
>  * SocketServerTest
>  * EpochDrivenReplicationProtocolAcceptanceTest
>  * LeaderEpochIntegrationTest
> Trogdor: [https://github.com/apache/kafka/pull/14771] 
>  * AgentTest
> Mirror: [https://github.com/apache/kafka/pull/14761] (DONE)
>  * DedicatedMirrorIntegrationTest
>  * MirrorConnectorsIntegrationTest
>  * MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> Runtime: [https://github.com/apache/kafka/pull/14764] 
>  * ConnectorTopicsIntegrationTest
>  * ExactlyOnceSourceIntegrationTest
>  * WorkerTest
>  * WorkerGroupMemberTest
> Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
>  * IQv2IntegrationTest
>  * MetricsReporterIntegrationTest
>  * NamedTopologyIntegrationTest
>  * PurgeRepartitionTopicIntegrationTest
> These can be addressed by just fixing the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15816: Fix leaked sockets in mirror tests [kafka]

2023-12-06 Thread via GitHub


gharris1727 merged PR #14761:
URL: https://github.com/apache/kafka/pull/14761


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15816: Fix leaked sockets in mirror tests [kafka]

2023-12-06 Thread via GitHub


gharris1727 commented on PR #14761:
URL: https://github.com/apache/kafka/pull/14761#issuecomment-1843486780

   Test failures appear unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]

2023-12-06 Thread via GitHub


apoorvmittal10 commented on code in PR #14933:
URL: https://github.com/apache/kafka/pull/14933#discussion_r1417826343


##
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##
@@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging {
   }
 
   @Test
-  def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = {
+  def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = {
 val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
   "--entity-type", "client-metrics",
   "--describe"))
 
-val exception = assertThrows(classOf[IllegalArgumentException], () => 
describeOpts.checkArgs())
-assertEquals("an entity name must be specified with --describe of 
client-metrics", exception.getMessage)
+val resourceCustom = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1")
+val configEntry = new ConfigEntry("metrics", "*")
+val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+val describeResult: DescribeConfigsResult = 
mock(classOf[DescribeConfigsResult])
+when(describeResult.all()).thenReturn(future)
+
+val node = new Node(1, "localhost", 9092)
+val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node) {
+  override def describeConfigs(resources: util.Collection[ConfigResource], 
options: DescribeConfigsOptions): DescribeConfigsResult = {
+assertTrue(options.includeSynonyms())
+assertEquals(1, resources.size)
+val resource = resources.iterator.next
+assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`)
+assertTrue(resourceCustom.name == resource.name)
+future.complete(Map(resourceCustom -> new 
Config(util.Collections.singletonList(configEntry))).asJava)
+describeResult
+  }
+}
+
mockAdminClient.incrementalAlterConfigs(util.Collections.singletonMap(resourceCustom,
+  util.Collections.singletonList(new AlterConfigOp(configEntry, 
AlterConfigOp.OpType.SET))), new AlterConfigsOptions())
+ConfigCommand.describeConfig(mockAdminClient, describeOpts)
+verify(describeResult).all()

Review Comment:
   @junrao The verification confirms that `.all` was exactly called/consumed 
once for the returned result in `ConfigCommand.scala`, by the code mentioned 
above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fully encapsulate user restore listener in the DelegatingRestoreListener [kafka]

2023-12-06 Thread via GitHub


ableegoldman merged PR #14886:
URL: https://github.com/apache/kafka/pull/14886


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Support to update Async Consumer group member label (KIP-714) [kafka]

2023-12-06 Thread via GitHub


apoorvmittal10 commented on code in PR #14946:
URL: https://github.com/apache/kafka/pull/14946#discussion_r1417815612


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -321,6 +332,9 @@ public void 
onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData respo
 this.memberEpoch = response.memberEpoch();
 ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
 
+clientTelemetryReporter.ifPresent(reporter -> 
reporter.updateMetricsLabels(
+Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, 
memberId)));

Review Comment:
   Yeah, I needed that feedback and that's the reason I saw multiple updates 
happening for labels and optimized duplicate labels.
   However I think handling duplicates was anyways required but let me check 
what should be the right place where this update should be called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]

2023-12-06 Thread via GitHub


apoorvmittal10 commented on code in PR #14933:
URL: https://github.com/apache/kafka/pull/14933#discussion_r1417807074


##
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##
@@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging {
   }
 
   @Test
-  def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = {
+  def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = {
 val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
   "--entity-type", "client-metrics",
   "--describe"))
 
-val exception = assertThrows(classOf[IllegalArgumentException], () => 
describeOpts.checkArgs())
-assertEquals("an entity name must be specified with --describe of 
client-metrics", exception.getMessage)
+val resourceCustom = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1")
+val configEntry = new ConfigEntry("metrics", "*")
+val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+val describeResult: DescribeConfigsResult = 
mock(classOf[DescribeConfigsResult])
+when(describeResult.all()).thenReturn(future)
+
+val node = new Node(1, "localhost", 9092)
+val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node) {
+  override def describeConfigs(resources: util.Collection[ConfigResource], 
options: DescribeConfigsOptions): DescribeConfigsResult = {
+assertTrue(options.includeSynonyms())
+assertEquals(1, resources.size)
+val resource = resources.iterator.next
+assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`)
+assertTrue(resourceCustom.name == resource.name)
+future.complete(Map(resourceCustom -> new 
Config(util.Collections.singletonList(configEntry))).asJava)
+describeResult
+  }
+}
+
mockAdminClient.incrementalAlterConfigs(util.Collections.singletonMap(resourceCustom,
+  util.Collections.singletonList(new AlterConfigOp(configEntry, 
AlterConfigOp.OpType.SET))), new AlterConfigsOptions())
+ConfigCommand.describeConfig(mockAdminClient, describeOpts)
+verify(describeResult).all()

Review Comment:
   `.all` is needed as it verifies the returned result future is actually 
consumed in `ConfigCommand.scala` here: 
https://github.com/apache/kafka/blob/46852eea1c620ff786f4c4c1ff4cbd47f912a1d9/core/src/main/scala/kafka/admin/ConfigCommand.scala#L610



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1417795477


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
-private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
+/**
+ * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
+ * member is in the {@link MemberState#UNSUBSCRIBED} state.
+ */
+public void resetPollTimer() {
+pollTimer.reset(maxPollIntervalMs);
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs,
+ final 
boolean ignoreResponse) {
+NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
+heartbeatRequestState.onSendAttempt(currentTimeMs);
+membershipManager.onHeartbeatRequestSent();
+return request;
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
boolean ignoreResponse) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
-coordinatorRequestManager.coordinator());
+new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+coordinatorRequestManager.coordinator());
 return request.whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), request.handler().completionTimeMs());
+// The response is only ignore when the member becomes staled. 
This is because the member needs to
+// rejoin on the next poll regardless the server has responded 
to the heartbeat.
+if (!ignoreResponse) {
+
membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) 
response.responseBody()).data());
+maybeSendGroupMetadataUpdateEvent();

Review Comment:
   @cadonna - can you comment on this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15871) Implement kafka-client-metrics.sh tool

2023-12-06 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15871.
-
Resolution: Fixed

merged the PR to trunk

> Implement kafka-client-metrics.sh tool
> --
>
> Key: KAFKA-15871
> URL: https://issues.apache.org/jira/browse/KAFKA-15871
> Project: Kafka
>  Issue Type: Sub-task
>  Components: admin
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
> Fix For: 3.7.0
>
>
> Implement the `kafka-client-metrics.sh` tool which is introduced in KIP-714 
> and enhanced in KIP-1000.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-12-06 Thread via GitHub


C0urante commented on PR #12728:
URL: https://github.com/apache/kafka/pull/12728#issuecomment-1843418398

   @mdedetrich Thanks, the changes look good to go. But the build is failing 
because there are new tests on trunk that still use EasyMock/PowerMock. Can you 
rebase on the latest from trunk?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15871: kafka-client-metrics.sh [kafka]

2023-12-06 Thread via GitHub


junrao merged PR #14926:
URL: https://github.com/apache/kafka/pull/14926


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]

2023-12-06 Thread via GitHub


junrao commented on code in PR #14933:
URL: https://github.com/apache/kafka/pull/14933#discussion_r1417759706


##
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##
@@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging {
   }
 
   @Test
-  def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = {
+  def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = {
 val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
   "--entity-type", "client-metrics",
   "--describe"))
 
-val exception = assertThrows(classOf[IllegalArgumentException], () => 
describeOpts.checkArgs())
-assertEquals("an entity name must be specified with --describe of 
client-metrics", exception.getMessage)
+val resourceCustom = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1")
+val configEntry = new ConfigEntry("metrics", "*")
+val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+val describeResult: DescribeConfigsResult = 
mock(classOf[DescribeConfigsResult])
+when(describeResult.all()).thenReturn(future)
+
+val node = new Node(1, "localhost", 9092)
+val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node) {
+  override def describeConfigs(resources: util.Collection[ConfigResource], 
options: DescribeConfigsOptions): DescribeConfigsResult = {
+assertTrue(options.includeSynonyms())
+assertEquals(1, resources.size)
+val resource = resources.iterator.next
+assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`)
+assertTrue(resourceCustom.name == resource.name)
+future.complete(Map(resourceCustom -> new 
Config(util.Collections.singletonList(configEntry))).asJava)
+describeResult
+  }
+}
+
mockAdminClient.incrementalAlterConfigs(util.Collections.singletonMap(resourceCustom,
+  util.Collections.singletonList(new AlterConfigOp(configEntry, 
AlterConfigOp.OpType.SET))), new AlterConfigsOptions())
+ConfigCommand.describeConfig(mockAdminClient, describeOpts)
+verify(describeResult).all()

Review Comment:
   It seems that we don't need to call `.all()` since we don't do anything with 
the return value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Support to update Async Consumer group member label (KIP-714) [kafka]

2023-12-06 Thread via GitHub


AndrewJSchofield commented on code in PR #14946:
URL: https://github.com/apache/kafka/pull/14946#discussion_r1417760787


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -321,6 +332,9 @@ public void 
onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData respo
 this.memberEpoch = response.memberEpoch();
 ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
 
+clientTelemetryReporter.ifPresent(reporter -> 
reporter.updateMetricsLabels(
+Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, 
memberId)));

Review Comment:
   This will work, but you are updating the metrics labels on every single 
heartbeat response. The member ID and member epoch are always sent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2023-12-06 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1417751374


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   This causes the passed-in lambda to be executed after `offsetWriteFuture` 
completes, but it doesn't cause `offsetWriteFuture::get` to block on the 
completion of that lambda, which means that if the caller of 
`ConnectorOffsetBackingStore::set` waits for the returned `Future` to complete 
right now, it'll complete immediately even though it's likely no offsets will 
have been written to either the primary or secondary store.
   
   Also, nit: when researching this behavior, I ran into 
`CompletableFuture::thenRun`, which is a slightly better fit than 
`CompletableFuture::thenAccept` (since we don't use the return value).
   
   ```suggestion
   offsetWriteFuture = offsetWriteFuture.thenRun(() -> {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-06 Thread via GitHub


mjsax commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1417761864


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -1477,6 +1533,29 @@ public Object getStateLock() {
 return stateLock;
 }
 
+public Map> consumerClientInstanceIds(final 
Duration timeout) {
+final Map> result = new HashMap<>();
+
+synchronized (fetchDeadlines) {
+boolean addDeadline = false;

Review Comment:
   This PR itself does not need it yet (but we need it later when we add 
support for restore consumer and producer)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2023-12-06 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1417677509


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.

Review Comment:
   This largely duplicates information present in the Javadoc for the method. 
IMO we can remove this entire comment block and, if we really want to make it 
easy for maintainers to understand not just the "what" but the "why" for this 
logic, we can link to https://issues.apache.org/jira/browse/KAFKA-15018 in the 
Javadoc for the method:
   
   ```java
*
* @see https://issues.apache.org/jira/browse/KAFKA-15018;>KAFKA-15018 for 
context on the three-step
* write sequence
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {
+Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+try {
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (ExecutionException e) {
+log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());

[jira] [Resolved] (KAFKA-15932) Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")

2023-12-06 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield resolved KAFKA-15932.
--
Resolution: Fixed

> Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")
> ---
>
> Key: KAFKA-15932
> URL: https://issues.apache.org/jira/browse/KAFKA-15932
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: flaky-test
>
> Intermittently failing test for the new consumer.
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/
> ```Error
> org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 
> records. The number consumed was 0.
> Stacktrace
> org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 
> records. The number consumed was 0.
>   at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161)
>   at 
> app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128)
>   at 
> app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>   at 
> app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>   at 
> 

Re: [PR] KAFKA-15932: Wait for responses in consumer operations [kafka]

2023-12-06 Thread via GitHub


cadonna merged PR #14912:
URL: https://github.com/apache/kafka/pull/14912


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-12-06 Thread via GitHub


jolshan commented on PR #14702:
URL: https://github.com/apache/kafka/pull/14702#issuecomment-1843376087

   I've restarted the build every day since the approval and have yet to get a 
clean build ‍ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Support to update Async Consumer group member label (KIP-714) [kafka]

2023-12-06 Thread via GitHub


apoorvmittal10 commented on PR #14946:
URL: https://github.com/apache/kafka/pull/14946#issuecomment-1843373904

   @kirktrue @philipnee @AndrewJSchofield Can you please review the change and 
let me know if change is at right place for new consumer?
   
   cc: @mjsax @wcarlson5 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Support to update Async Consumer group member label (KIP-714) [kafka]

2023-12-06 Thread via GitHub


apoorvmittal10 opened a new pull request, #14946:
URL: https://github.com/apache/kafka/pull/14946

   - As the KIP-848 introduced new Kafka Consumer hence KIP-714 required 
further changes to capture `group_member_id`. - The change also includes a fix 
to update labels, as earlier calling `updateLabels` multiple times for same key 
would have added new resource attribute every time. Now it updates the map and 
then construct the labels, which updates the existing labels with new value.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >