[GitHub] [kafka] hudeqi commented on pull request #13888: MINOR: Fix typos for streams

2023-06-20 Thread via GitHub


hudeqi commented on PR #13888:
URL: https://github.com/apache/kafka/pull/13888#issuecomment-1600053247

   > @hudeqi I'm using a tool called 
[typos](https://github.com/crate-ci/typos). And it can be integrated with 
GitHub Actions. But I'm sure if it's a good practice to intergrate with repo.
   
   Maybe you can try to launch a KIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush commented on pull request #13888: MINOR: Fix typos for streams

2023-06-20 Thread via GitHub


xiaocairush commented on PR #13888:
URL: https://github.com/apache/kafka/pull/13888#issuecomment-1600049673

   @hudeqi I'm using a tool called [typos](https://github.com/crate-ci/typos). 
And it can be integrated with GitHub Actions. But I'm sure if it's a good 
practice to intergrate with repo.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-20 Thread via GitHub


jeffkbkim commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1236242369


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerDe.java:
##
@@ -0,0 +1,161 @@
+/*

Review Comment:
   nit: Other serdes follow "Serde" capilization



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerDe.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerDe implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {
+@Override
+public byte[] serializeKey(Record record) {
+// Record does not accept a null key.
+return MessageUtil.toVersionPrefixedBytes(
+record.key().version(),
+record.key().message()
+);
+}
+
+@Override
+public byte[] serializeValue(Record record) {
+// Tombstone is represented with a null value.
+if (record.value() == null) {
+return null;
+} else {
+return MessageUtil.toVersionPrefixedBytes(
+record.value().version(),
+record.value().message()
+);
+}
+}
+
+@Override
+public Record deserialize(
+ByteBuffer keyBuffer,
+ByteBuffer valueBuffer
+) throws RuntimeException {
+final short keyVersion = readVersion(keyBuffer, "key");
+final ApiMessage keyMessage = apiMessageKeyFor(keyVersion);
+readMessage(keyMessage, keyBuffer, keyVersion, "key");
+
+if (valueBuffer == null) {
+return new Record(
+new ApiMessageAndVersion(keyMessage, keyVersion),
+null
+);
+}
+
+final ApiMessage valueMessage = apiMessageValueFor(keyVersion);
+final short valueVersion = readVersion(valueBuffer, "value");
+readMessage(valueMessage, valueBuffer, valueVersion, "value");
+
+return new Record(
+new ApiMessageAndVersion(keyMessage, keyVersion),
+new ApiMessageAndVersion(valueMessage, valueVersion)
+);
+}
+
+private short readVersion(ByteBuffer buffer, String name) throws 
RuntimeException {
+try {
+return buffer.getShort();
+} 

[GitHub] [kafka] xiaocairush commented on pull request #13884: MINOR: fix typos for client

2023-06-20 Thread via GitHub


xiaocairush commented on PR #13884:
URL: https://github.com/apache/kafka/pull/13884#issuecomment-1600037542

   test this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush commented on pull request #13883: MINOR: Fix typos for doc

2023-06-20 Thread via GitHub


xiaocairush commented on PR #13883:
URL: https://github.com/apache/kafka/pull/13883#issuecomment-1600024221

   Besides, how do you ensure there is not some people work on the same issue 
simultaneously in jira when the issue is not assigned. Seems like I need a jira 
account to assign issue to myself?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

2023-06-20 Thread via GitHub


showuon commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-167864

   > offsets commits are happening somewhere else and that's via the 
producer.sendOffsetsToTransaction(..) e.g
   
   Oh, EOS case! I didn't consider it, sorry! 
   Hmm... if there is EOS case to consider, the original cache mechanism will 
not work since the offset commit is not via consumer, the consumer has no idea 
which offset has committed. 
   I think we should close this PR and JIRA ticket as "invalid" and add comment 
into the JIRA ticket. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush commented on pull request #13883: MINOR: Fix typos for doc

2023-06-20 Thread via GitHub


xiaocairush commented on PR #13883:
URL: https://github.com/apache/kafka/pull/13883#issuecomment-164067

   @divijvaidya I'm using a tool called 
[typos](https://github.com/crate-ci/typos). And it can be integrated with 
GitHub Actions. But I don't sure if it's a good practice to intergrate with 
repos. Because it maybe fail the CI more frequently. Maybe it's more efficient 
to correct typos monthly by the tool.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-20 Thread via GitHub


showuon commented on PR #13850:
URL: https://github.com/apache/kafka/pull/13850#issuecomment-151668

   @divijvaidya , sorry, which commits should I check?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13888: MINOR: Fix typos for streams

2023-06-20 Thread via GitHub


hudeqi commented on PR #13888:
URL: https://github.com/apache/kafka/pull/13888#issuecomment-1599966481

   Seeing that you have found so many typos, is there any convenient way? Can 
you share it so that we can scan globally and fix it all at once? @xiaocairush 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lukestephenson-zendesk commented on a diff in pull request #13447: MINOR: Change ordering of checks to prevent log spam on metadata updates

2023-06-20 Thread via GitHub


lukestephenson-zendesk commented on code in PR #13447:
URL: https://github.com/apache/kafka/pull/13447#discussion_r1236164778


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -399,8 +399,13 @@ private Optional 
updateLatestMetadata(
 // Between the time that a topic is deleted and re-created, 
the client may lose track of the
 // corresponding topicId (i.e. `oldTopicId` will be null). In 
this case, when we discover the new
 // topicId, we allow the corresponding leader epoch to 
override the last seen value.
-log.info("Resetting the last seen epoch of partition {} to {} 
since the associated topicId changed from {} to {}",
- tp, newEpoch, oldTopicId, topicId);
+if (oldTopicId != null) {
+log.info("Resetting the last seen epoch of partition {} to 
{} since the associated topicId changed from {} to {}",
+tp, newEpoch, oldTopicId, topicId);
+} else {
+log.debug("Resetting the last seen epoch of partition {} 
to {} since the associated topicId was undefined but is now set to {}",

Review Comment:
   Thanks for the response @jolshan. I'm seeing this log statement on multiple 
Kafka producers. It is logged once every 5 minutes, for each topic partition, 
not just on startup. (Came to my attention because we exhausted our log quota 
and this was the biggest cause of logs).
   
   Here is an example log that is generated:
   ```
   [Producer clientId=producer-1] Resetting the last seen epoch of partition 
my.topic-3 to 1286 since the associated topicId changed from null to 
lkaiBwHxQ6iQHE49_AVysQ
   ```
   
   This is generated every 5 minutes for each topic partition (so a topic with 
10 partitions generates 10 log entries). Each time the topic id is updated from 
`null` to `lkaiBwHxQ6iQHE49_AVysQ`. All partitions for the same topic show the 
same topic id.
   
   Running kafka-client 3.4.0 or 3.5.0 on the producers.  Brokers are running 
Kafka 2.8.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13582: MINOR: Fix lossy conversions flagged by Java 20

2023-06-20 Thread via GitHub


ijuma commented on code in PR #13582:
URL: https://github.com/apache/kafka/pull/13582#discussion_r1236054069


##
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##
@@ -431,7 +431,7 @@ private static void skipBytes(InputStream in, int 
bytesToSkip) throws IOExceptio
 
 // Starting JDK 12, this implementation could be replaced by 
InputStream#skipNBytes
 while (bytesToSkip > 0) {
-long ns = in.skip(bytesToSkip);
+int ns = (int) in.skip(bytesToSkip);

Review Comment:
   New issue introduced in trunk after this PR was originally created. There 
are a few others, but I'll address them together with the JDK 20 CI build 
(which will prevent the issue altogether).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13582: MINOR: Fix lossy conversions flagged by Java 20

2023-06-20 Thread via GitHub


ijuma commented on code in PR #13582:
URL: https://github.com/apache/kafka/pull/13582#discussion_r1236050527


##
clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java:
##
@@ -34,7 +34,7 @@ public Short deserialize(String topic, byte[] data) {
 short value = 0;
 for (byte b : data) {
 value <<= 8;
-value |= b & 0xFF;
+value |= (short) (b & 0xFF);

Review Comment:
   Fixed SpotBugs issue by changing `(byte)` to `(short)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15108) task.timeout.ms does not work when TimeoutException is thrown by streams producer

2023-06-20 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-15108:
---
Description: 
[Problem]
 - task.timeout.ms does not work when TimeoutException is thrown by streams 
producer
 -- Kafka Streams upgrade guide says, "Kafka Streams is now handling 
TimeoutException thrown by the consumer, producer, and admin client."(1) and 
"To bound how long Kafka Streams retries a task, you can set task.timeout.ms 
(default is 5 minutes)."(1).
 -- However, it doesn't look like task.timeout.ms is working for the streams 
producer, then it seems to keep retrying forever.

[Environment]
 - Kafka Streams 3.5.0

[Reproduction procedure]
 # Create "input-topic" topic
 # Put several messages on "input-topic"
 # DONT create "output-topic" topic, to fire TimeoutException
 # Create the following simple Kafka streams program; this program just 
transfers messages from "input-topic" to "output-topic".
 -- 
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler");
 // not needed

StreamsBuilder builder = new StreamsBuilder();

builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
{code}
 # Wait for task.timeout.ms (default is 5 minutes).
 ## If the debug log is enabled, a large number of UNKNOWN_TOPIC_OR_PARTITIONs 
will be logged because "output-topic" does not exist.
 ## And every one minute, TimeoutException will be generated (2)
 # ==> However, it doesn't look like task.timeout.ms is working for the streams 
producer, then it seems to keep retrying forever.
 ## My excepted behavior is that task.timeout.ms is working, and the client 
will be shutdown because the default behavior is 
StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown.

[As far as my investigation]
 - TimeoutException thrown by the streams producer is replaced with 
TaskCorruptedException in RecordCollectorImpl.recordSendError(...) (3)
 - And after that it does not appear to be executing code that contains logic 
related to task.timeout.ms.

(1) Kafka Streams upgrade guide
 - [https://kafka.apache.org/35/documentation/streams/upgrade-guide]
 - 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams]
{code:java}
Kafka Streams is now handling TimeoutException thrown by the consumer, 
producer, and admin client. If a timeout occurs on a task, Kafka Streams moves 
to the next task and retries to make progress on the failed task in the next 
iteration. To bound how long Kafka Streams retries a task, you can set 
task.timeout.ms (default is 5 minutes). If a task does not make progress within 
the specified task timeout, which is tracked on a per-task basis, Kafka Streams 
throws a TimeoutException (cf. KIP-572).
{code}

(2) TimeoutException occurs
{code:java}
2023-06-19 19:51:26 WARN  NetworkClient:1145 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Error while fetching metadata with correlation id 1065 : 
{output-topic=UNKNOWN_TOPIC_OR_PARTITION}
2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Requesting metadata update for topic output-topic due to error 
UNKNOWN_TOPIC_OR_PARTITION
2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Updated cluster metadata updateVersion 1064 to 
MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', 
nodes={0=a86b6e81dded542bb867337e34fa7954-1776321381.ap-northeast-1.elb.amazonaws.com:9094
 (id: 0 rack: null), 
1=a99402a2de0054c2a96e87075df0f545-254291543.ap-northeast-1.elb.amazonaws.com:9094
 (id: 1 rack: null), 
2=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}, partitions=[], 
controller=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}
2023-06-19 19:51:26 DEBUG KafkaProducer:1073 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present 
in metadata after 6 ms.

[jira] [Commented] (KAFKA-15108) task.timeout.ms does not work when TimeoutException is thrown by streams producer

2023-06-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735476#comment-17735476
 ] 

Matthias J. Sax commented on KAFKA-15108:
-

There are a few cases for which we cannot handle a `TimeoutException` more 
gracefully, and the docs gloss over this fact. – The scenario you describe is 
one of these cases.

I agree that we should maybe try to include it – the challenge (and why it was 
not included in the original work) is, that it will need different handling 
compared how we handle `TimeoutException` for the regular case...

> task.timeout.ms does not work when TimeoutException is thrown by streams 
> producer
> -
>
> Key: KAFKA-15108
> URL: https://issues.apache.org/jira/browse/KAFKA-15108
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: Tomonari Yamashita
>Priority: Major
>
> [Problem]
>  - task.timeout.ms does not work when TimeoutException is thrown by streams 
> producer
>  -- Kafka Streams upgrade guide says, "Kafka Streams is now handling 
> TimeoutException thrown by the consumer, producer, and admin client."(1) and 
> "To bound how long Kafka Streams retries a task, you can set task.timeout.ms 
> (default is 5 minutes)."(1).
>  -- However, it doesn't look like task.timeout.ms is working for the streams 
> producer, then it seems to keep retrying forever.
> [Environment]
>  - Kafka Streams 3.5.0
> [Reproduce procedure]
>  # Create "input-topic" topic
>  # Put several messages on "input-topic"
>  # DONT create "output-topic" topic, to fire TimeoutException
>  # Create the following simple Kafka streams program; this program just 
> transfers messages from "input-topic" to "output-topic".
>  -- 
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
> props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler");
>  // not needed
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
> .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
> KafkaStreams streams = new KafkaStreams(builder.build(), props);
> {code}
>  # Wait for task.timeout.ms (default is 5 minutes).
>  ## If the debug log is enabled, a large number of 
> UNKNOWN_TOPIC_OR_PARTITIONs will be logged because "output-topic" does not 
> exist.
>  ## And every one minute, TimeoutException will be generated (2)
>  # ==> However, it doesn't look like task.timeout.ms is working for the 
> streams producer, then it seems to keep retrying forever.
>  ## My excepted behavior is that task.timeout.ms is working, and the client 
> will be shutdown because the default behavior is 
> StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown.
> [As far as my investigation]
>  - TimeoutException thrown by the streams producer is replaced with 
> TaskCorruptedException in RecordCollectorImpl.recordSendError(...) (3)
>  - And after that it does not appear to be executing code that contains logic 
> related to task.timeout.ms.
> (1) Kafka Streams upgrade guide
> - [https://kafka.apache.org/35/documentation/streams/upgrade-guide]
> - 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams]
> {code:java}
> Kafka Streams is now handling TimeoutException thrown by the consumer, 
> producer, and admin client. If a timeout occurs on a task, Kafka Streams 
> moves to the next task and retries to make progress on the failed task in the 
> next iteration. To bound how long Kafka Streams retries a task, you can set 
> task.timeout.ms (default is 5 minutes). If a task does not make progress 
> within the specified task timeout, which is tracked on a per-task basis, 
> Kafka Streams throws a TimeoutException (cf. KIP-572).
> {code}
> (2) TimeoutException occurs
> {code:java}
> 2023-06-19 19:51:26 WARN  NetworkClient:1145 - [Producer 
> clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
>  Error while fetching metadata with correlation id 1065 : 
> {output-topic=UNKNOWN_TOPIC_OR_PARTITION}
> 2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer 
> clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
>  Requesting metadata update for topic output-topic due to error 
> 

[GitHub] [kafka] lmr3796 commented on a diff in pull request #13187: MINOR: Log lastCaughtUpTime on ISR shrinkage

2023-06-20 Thread via GitHub


lmr3796 commented on code in PR #13187:
URL: https://github.com/apache/kafka/pull/13187#discussion_r1235962927


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -1046,10 +1046,14 @@ class Partition(val topicPartition: TopicPartition,
   partitionState match {
 case currentState: CommittedPartitionState if 
outOfSyncReplicaIds.nonEmpty =>
   val outOfSyncReplicaLog = outOfSyncReplicaIds.map { replicaId =>
-val logEndOffsetMessage = getReplica(replicaId)
-  .map(_.stateSnapshot.logEndOffset.toString)
+val replicaStateSnapshot = 
getReplica(replicaId).map(_.stateSnapshot)
+val logEndOffsetMessage = replicaStateSnapshot
+  .map(_.logEndOffset.toString)
   .getOrElse("unknown")
-s"(brokerId: $replicaId, endOffset: $logEndOffsetMessage)"
+val lastCaughtUpTimeMessage = replicaStateSnapshot
+  .map(_.lastCaughtUpTimeMs.toString)
+  .getOrElse("unknown")
+s"(brokerId: $replicaId, endOffset: $logEndOffsetMessage, 
lastCaughtUpTime: $lastCaughtUpTimeMessage)"

Review Comment:
   Hi @divijvaidya thanks for the great idea.
   
   Just added the suffix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13187: MINOR: Log lastCaughtUpTime on ISR shrinkage

2023-06-20 Thread via GitHub


divijvaidya commented on code in PR #13187:
URL: https://github.com/apache/kafka/pull/13187#discussion_r1235943898


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -1046,10 +1046,14 @@ class Partition(val topicPartition: TopicPartition,
   partitionState match {
 case currentState: CommittedPartitionState if 
outOfSyncReplicaIds.nonEmpty =>
   val outOfSyncReplicaLog = outOfSyncReplicaIds.map { replicaId =>
-val logEndOffsetMessage = getReplica(replicaId)
-  .map(_.stateSnapshot.logEndOffset.toString)
+val replicaStateSnapshot = 
getReplica(replicaId).map(_.stateSnapshot)
+val logEndOffsetMessage = replicaStateSnapshot
+  .map(_.logEndOffset.toString)
   .getOrElse("unknown")
-s"(brokerId: $replicaId, endOffset: $logEndOffsetMessage)"
+val lastCaughtUpTimeMessage = replicaStateSnapshot
+  .map(_.lastCaughtUpTimeMs.toString)
+  .getOrElse("unknown")
+s"(brokerId: $replicaId, endOffset: $logEndOffsetMessage, 
lastCaughtUpTime: $lastCaughtUpTimeMessage)"

Review Comment:
   let's add unit to the time here. Otherwise it is difficult to figure out the 
unit sometimes. May I suggest changing changing name of lastCaughtUpTime to 
`lastCaughtUpTimeMs` instead? It is also consistent with 
https://github.com/apache/kafka/blob/474053d2973b8790e50ccfe1bb0699694b0de1c7/core/src/main/scala/kafka/cluster/Replica.scala#L178



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-20 Thread via GitHub


jolshan commented on PR #13787:
URL: https://github.com/apache/kafka/pull/13787#issuecomment-1599609852

   I'm getting tripped up on some flaky tests -- namely 
   
[kafka.api.TransactionsBounceTest.testWithGroupId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13787/13/testReport/junit/kafka.api/TransactionsBounceTest/testWithGroupId__/)
   
[kafka.api.TransactionsBounceTest.testWithGroupId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13787/13/testReport/junit/kafka.api/TransactionsBounceTest/testWithGroupId___2/)
   
[kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13787/13/testReport/junit/kafka.api/TransactionsTest/testBumpTransactionalEpoch_String__quorum_kraft/)
   
[kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13787/13/testReport/junit/kafka.api/TransactionsTest/testBumpTransactionalEpoch_String__quorum_kraft_2/)
   
   Both involve restarting brokers, so I suspect I'm missing some logic there. 
testBumpTransactionalEpoch was failing before, but with a different error, so 
I've filed https://issues.apache.org/jira/browse/KAFKA-15099 for that. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1235928525


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -42,13 +50,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)

Review Comment:
   That's a good call. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-20 Thread via GitHub


divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1235924422


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -42,13 +50,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)

Review Comment:
   You probably want to unregister these metrics when the thread is shutdown. 
Also, please add a test to validate that metrics are being removed correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13421:
URL: https://github.com/apache/kafka/pull/13421#discussion_r1235920177


##
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala:
##
@@ -196,6 +201,110 @@ class ReplicaAlterLogDirsThreadTest {
 assertEquals(0, thread.partitionCount)
   }
 
+  @Test
+  def shouldResumeCleanLogDirAfterMarkPartitionFailed(): Unit = {
+val brokerId = 1
+val partitionId = 0
+val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, 
"localhost:1234"))
+
+val partition = Mockito.mock(classOf[Partition])
+val replicaManager = Mockito.mock(classOf[ReplicaManager])
+val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
+val futureLog = Mockito.mock(classOf[UnifiedLog])
+val logManager = Mockito.mock(classOf[LogManager])
+
+val logs = new Pool[TopicPartition, UnifiedLog]()
+logs.put(t1p0, futureLog)
+val logCleaner = new LogCleaner(new CleanerConfig(true),
+  logDirs = Array(TestUtils.tempDir()),
+  logs = logs,
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  time = new MockTime)
+
+val leaderEpoch = 5
+val logEndOffset = 0
+
+when(partition.partitionId).thenReturn(partitionId)
+when(replicaManager.metadataCache).thenReturn(metadataCache)
+when(replicaManager.futureLocalLogOrException(t1p0)).thenReturn(futureLog)
+when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
+when(replicaManager.onlinePartition(t1p0)).thenReturn(Some(partition))
+when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
+when(replicaManager.logManager).thenReturn(logManager)
+doAnswer(_ => {
+  logCleaner.abortAndPauseCleaning(t1p0)
+}).when(logManager).abortAndPauseCleaning(t1p0)
+doAnswer(_ => {
+  logCleaner.resumeCleaning(Seq(t1p0))
+}).when(logManager).resumeCleaning(t1p0)
+
+when(quotaManager.isQuotaExceeded).thenReturn(false)
+
+when(partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, 
fetchOnlyFromLeader = false))
+  .thenReturn(new EpochEndOffset()
+.setPartition(partitionId)
+.setErrorCode(Errors.NONE.code)
+.setLeaderEpoch(leaderEpoch)
+.setEndOffset(logEndOffset))
+when(partition.futureLocalLogOrException).thenReturn(futureLog)
+doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
+when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(false)
+
+when(futureLog.logStartOffset).thenReturn(0L)
+when(futureLog.logEndOffset).thenReturn(0L)
+when(futureLog.latestEpoch).thenReturn(None)
+
+val requestData = new FetchRequest.PartitionData(topicId, 0L, 0L,
+  config.replicaFetchMaxBytes, Optional.of(leaderEpoch))
+val responseData = new FetchPartitionData(
+  Errors.NONE,
+  0L,
+  0L,
+  MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1000, 
"foo".getBytes(StandardCharsets.UTF_8))),
+  Optional.empty(),
+  OptionalLong.empty(),
+  Optional.empty(),
+  OptionalInt.empty(),
+  false)
+mockFetchFromCurrentLog(tid1p0, requestData, config, replicaManager, 
responseData)
+
+val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+val leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, 
quotaManager)
+val thread = new ReplicaAlterLogDirsThread(
+  "alter-logs-dirs-thread",
+  leader,
+  failedPartitions,
+  replicaManager,
+  quotaManager,
+  new BrokerTopicStats,
+  config.replicaFetchBackoffMs)
+
+// before starting the fetch, pause the clean of the partition.
+logManager.abortAndPauseCleaning(t1p0)
+thread.addPartitions(Map(t1p0 -> initialFetchState(fetchOffset = 0L, 
leaderEpoch)))
+assertTrue(thread.fetchState(t1p0).isDefined)
+assertEquals(1, thread.partitionCount)
+
+// first test: get handle without exception
+when(partition.appendRecordsToFollowerOrFutureReplica(any(), 
ArgumentMatchers.eq(true))).thenReturn(None)
+
+thread.doWork()
+
+assertTrue(thread.fetchState(t1p0).isDefined)
+assertEquals(1, thread.partitionCount)
+assertTrue(logCleaner.isCleaningInStatePaused(t1p0))
+
+// second test: process partition data with throwing a 
KafkaStorageException.
+when(partition.appendRecordsToFollowerOrFutureReplica(any(), 
ArgumentMatchers.eq(true))).thenThrow(new KafkaStorageException("disk error"))

Review Comment:
   My question is for the replica partition. How do we recover (or ensure we 
don't recover) that one. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13421:
URL: https://github.com/apache/kafka/pull/13421#discussion_r1235919152


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -706,6 +706,8 @@ abstract class AbstractFetcherThread(name: String,
*- the request succeeded or
*- it was fenced and this thread hasn't received new epoch, which means 
we need not backoff and retry as the
*partition is moved to failed state.
+   *- for ReplicaAlterLogDirsThread, "OutOfRange" error is generated when 
starting fetch to leader (source dir,

Review Comment:
   That's another way of phrasing my question -- thanks for checking :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13876: KAFKA-10733: Clean up producer exceptions

2023-06-20 Thread via GitHub


divijvaidya commented on PR #13876:
URL: https://github.com/apache/kafka/pull/13876#issuecomment-1599595596

   1. We might end up breaking customer's application code for exception 
handling with this change. What is our plan to prevent the inadvertent impact 
to customer's code on upgrade?
   2. This should probably be accompanied with changes in docs and notable 
change section for 3.6.0 - 
https://kafka.apache.org/documentation.html#upgrade_350_notable 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13876: KAFKA-10733: Clean up producer exceptions

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13876:
URL: https://github.com/apache/kafka/pull/13876#discussion_r1235917215


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1013,16 +1013,8 @@ private void maybeFailWithError() {
 if (!hasError()) {
 return;
 }
-// for ProducerFencedException, do not wrap it as a KafkaException
-// but create a new instance without the call trace since it was not 
thrown because of the current call
-if (lastError instanceof ProducerFencedException) {
-throw new ProducerFencedException("Producer with transactionalId 
'" + transactionalId
-+ "' and " + producerIdAndEpoch + " has been fenced by 
another producer " +
-"with the same transactionalId");
-}
-if (lastError instanceof InvalidProducerEpochException) {

Review Comment:
   I saw we had some tests that verified producer fenced, but do we not have 
any for invalid producer epoch here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13876: KAFKA-10733: Clean up producer exceptions

2023-06-20 Thread via GitHub


jolshan commented on PR #13876:
URL: https://github.com/apache/kafka/pull/13876#issuecomment-1599593794

   Thanks @lucasbru -- just had one question. Everything else looks pretty good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13881: MINOR: fix typo of ProducerConfig and KafkaProducer

2023-06-20 Thread via GitHub


divijvaidya commented on code in PR #13881:
URL: https://github.com/apache/kafka/pull/13881#discussion_r1235909656


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -94,8 +94,8 @@ public class ProducerConfig extends AbstractConfig {
  + "batch size is under this 
batch.size setting.";
 
 /** partitioner.adaptive.partitioning.enable */
-public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG 
= "partitioner.adaptive.partitioning.enable";

Review Comment:
   This is a public field in a public class: 
https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html#PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG
 
   
   Many customers may have code which may be relying on this typo and this 
change will break those customers. There is very little upside in changing it 
now and huge downside wrt backward compatibility. I will suggest to keep it as 
it is for now. We will have to live with it until next major version (4.x).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13883: MINOR: Fix typos for doc

2023-06-20 Thread via GitHub


divijvaidya commented on PR #13883:
URL: https://github.com/apache/kafka/pull/13883#issuecomment-1599501486

   Also, @xiaocairush is it possible to automate testing for these spell typos 
in checkstyle of some other build plugin? What tool did you use to find these?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13888: MINOR: Fix typos for streams

2023-06-20 Thread via GitHub


divijvaidya merged PR #13888:
URL: https://github.com/apache/kafka/pull/13888


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13888: MINOR: Fix typos for streams

2023-06-20 Thread via GitHub


divijvaidya commented on PR #13888:
URL: https://github.com/apache/kafka/pull/13888#issuecomment-1599495288

   Unrelated test failures:
   ```
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13888/2/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
   [Build / JDK 17 and Scala 2.13 / 
kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13888/2/testReport/junit/kafka.admin/TopicCommandIntegrationTest/Build___JDK_17_and_Scala_2_13___testDescribeUnderMinIsrPartitionsMixed_String__quorum_kraft/)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0

2023-06-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-15053:
-

Assignee: Bo Gao

> Regression for security.protocol validation starting from 3.3.0
> ---
>
> Key: KAFKA-15053
> URL: https://issues.apache.org/jira/browse/KAFKA-15053
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Bo Gao
>Assignee: Bo Gao
>Priority: Major
>
> [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue 
> introduced validations on multiple configs. As a consequence, config 
> {{security.protocol}} now only allows upper case values such as PLAINTEXT, 
> SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like 
> sasl_ssl, ssl are also supported, there's even a case insensitive logic 
> inside 
> [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73]
>  to handle the lower case values.
> I think we should treat this as a regression bug since we don't support lower 
> case values anymore since 3.3.0. For versions later than 3.3.0, we are 
> getting error like this when using lower case value sasl_ssl
> {{Invalid value sasl_ssl for configuration security.protocol: String must be 
> one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-20 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1235823024


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1045,1265 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+// Below stores all methods to handle generic group APIs.
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> 
genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < groupMinSessionTimeoutMs ||
+sessionTimeoutMs > groupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = (GenericGroup) getOrMaybeCreateGroup(groupId, GENERIC, 
isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+String joinReason = request.reason();
+if (joinReason == null || joinReason.isEmpty()) {
+joinReason = "not provided";
+}
+
+if (!acceptJoiningMember(group, memberId)) {
+group.remove(memberId);
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+.setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code())
+);
+
+} else if (isUnknownMember) {
+result = genericGroupJoinNewMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+} else {
+result = genericGroupJoinExistingMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+}
+
+// Attempt to complete join group phase. We do not complete
+// the join group phase if this is the initial rebalance.
+if (group.isInState(PREPARING_REBALANCE) &&
+group.hasAllMembersJoined() &&
+group.generationId() != 0
+) {
+completeGenericGroupJoin(group);
+}
+}
+
+return result;
+}
+
+/**
+ * Handle a new member generic group join.
+ *
+ * @param context The request context.
+ * @param request The join group request.
+ * @param group   The group to add the member.
+ * @param joinReason  The client reason for the join request.
+ * @param responseFuture  The response future to complete.
+ *
+ * @return The coordinator result that will be appended to the log.
+ */
+private CoordinatorResult, Record> 
genericGroupJoinNewMember(
+RequestContext context,
+JoinGroupRequestData request,
+GenericGroup group,
+String joinReason,
+CompletableFuture responseFuture
+) {
+List protocols = new ArrayList<>();
+request.protocols().forEach(protocol -> protocols.add(new 
Protocol(protocol.name(), protocol.metadata(;
+if (group.isInState(DEAD)) {
+// if the group is marked as dead, it means some other thread has 
just removed the group
+// from the coordinator metadata; it is likely that the group has 
migrated to some other
+// coordinator OR the group is in a transient unstable phase. Let 
the member retry
+// finding the correct coordinator and rejoin.
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-20 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1235822238


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1045,1265 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+// Below stores all methods to handle generic group APIs.
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> 
genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < groupMinSessionTimeoutMs ||
+sessionTimeoutMs > groupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = (GenericGroup) getOrMaybeCreateGroup(groupId, GENERIC, 
isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+String joinReason = request.reason();
+if (joinReason == null || joinReason.isEmpty()) {
+joinReason = "not provided";
+}
+
+if (!acceptJoiningMember(group, memberId)) {
+group.remove(memberId);
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+.setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code())
+);
+
+} else if (isUnknownMember) {
+result = genericGroupJoinNewMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+} else {
+result = genericGroupJoinExistingMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+}
+
+// Attempt to complete join group phase. We do not complete
+// the join group phase if this is the initial rebalance.
+if (group.isInState(PREPARING_REBALANCE) &&
+group.hasAllMembersJoined() &&
+group.generationId() != 0
+) {
+completeGenericGroupJoin(group);
+}
+}
+
+return result;
+}
+
+/**
+ * Handle a new member generic group join.
+ *
+ * @param context The request context.
+ * @param request The join group request.
+ * @param group   The group to add the member.
+ * @param joinReason  The client reason for the join request.
+ * @param responseFuture  The response future to complete.
+ *
+ * @return The coordinator result that will be appended to the log.
+ */
+private CoordinatorResult, Record> 
genericGroupJoinNewMember(
+RequestContext context,
+JoinGroupRequestData request,
+GenericGroup group,
+String joinReason,
+CompletableFuture responseFuture
+) {
+List protocols = new ArrayList<>();
+request.protocols().forEach(protocol -> protocols.add(new 
Protocol(protocol.name(), protocol.metadata(;
+if (group.isInState(DEAD)) {
+// if the group is marked as dead, it means some other thread has 
just removed the group
+// from the coordinator metadata; it is likely that the group has 
migrated to some other
+// coordinator OR the group is in a transient unstable phase. Let 
the member retry
+// finding the correct coordinator and rejoin.
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+

[GitHub] [kafka] divijvaidya merged pull request #13886: MINOR: Fix typos for group coordinator

2023-06-20 Thread via GitHub


divijvaidya merged PR #13886:
URL: https://github.com/apache/kafka/pull/13886


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13887: MINOR: Fix typos for server common

2023-06-20 Thread via GitHub


divijvaidya merged PR #13887:
URL: https://github.com/apache/kafka/pull/13887


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-20 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1235818207


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1045,1265 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+// Below stores all methods to handle generic group APIs.
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> 
genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < groupMinSessionTimeoutMs ||
+sessionTimeoutMs > groupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = (GenericGroup) getOrMaybeCreateGroup(groupId, GENERIC, 
isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+String joinReason = request.reason();
+if (joinReason == null || joinReason.isEmpty()) {
+joinReason = "not provided";
+}
+
+if (!acceptJoiningMember(group, memberId)) {
+group.remove(memberId);
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+.setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code())
+);
+
+} else if (isUnknownMember) {
+result = genericGroupJoinNewMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+} else {
+result = genericGroupJoinExistingMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+}
+
+// Attempt to complete join group phase. We do not complete
+// the join group phase if this is the initial rebalance.
+if (group.isInState(PREPARING_REBALANCE) &&
+group.hasAllMembersJoined() &&
+group.generationId() != 0
+) {
+completeGenericGroupJoin(group);
+}
+}
+
+return result;
+}
+
+/**
+ * Handle a new member generic group join.
+ *
+ * @param context The request context.
+ * @param request The join group request.
+ * @param group   The group to add the member.
+ * @param joinReason  The client reason for the join request.
+ * @param responseFuture  The response future to complete.
+ *
+ * @return The coordinator result that will be appended to the log.
+ */
+private CoordinatorResult, Record> 
genericGroupJoinNewMember(
+RequestContext context,
+JoinGroupRequestData request,
+GenericGroup group,
+String joinReason,
+CompletableFuture responseFuture
+) {
+List protocols = new ArrayList<>();
+request.protocols().forEach(protocol -> protocols.add(new 
Protocol(protocol.name(), protocol.metadata(;
+if (group.isInState(DEAD)) {
+// if the group is marked as dead, it means some other thread has 
just removed the group
+// from the coordinator metadata; it is likely that the group has 
migrated to some other
+// coordinator OR the group is in a transient unstable phase. Let 
the member retry
+// finding the correct coordinator and rejoin.
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+

[GitHub] [kafka] divijvaidya commented on pull request #13883: MINOR: Fix typos for doc

2023-06-20 Thread via GitHub


divijvaidya commented on PR #13883:
URL: https://github.com/apache/kafka/pull/13883#issuecomment-1599479499

   please rebase with trunk, #13882 has been merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13882: MINOR: Fix some typos for core

2023-06-20 Thread via GitHub


divijvaidya merged PR #13882:
URL: https://github.com/apache/kafka/pull/13882


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-20 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1235813521


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1045,1265 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+// Below stores all methods to handle generic group APIs.
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> 
genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < groupMinSessionTimeoutMs ||
+sessionTimeoutMs > groupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = (GenericGroup) getOrMaybeCreateGroup(groupId, GENERIC, 
isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+String joinReason = request.reason();
+if (joinReason == null || joinReason.isEmpty()) {
+joinReason = "not provided";
+}
+
+if (!acceptJoiningMember(group, memberId)) {
+group.remove(memberId);
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+.setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code())
+);
+
+} else if (isUnknownMember) {
+result = genericGroupJoinNewMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+} else {
+result = genericGroupJoinExistingMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+}
+
+// Attempt to complete join group phase. We do not complete
+// the join group phase if this is the initial rebalance.
+if (group.isInState(PREPARING_REBALANCE) &&
+group.hasAllMembersJoined() &&
+group.generationId() != 0
+) {
+completeGenericGroupJoin(group);
+}
+}
+
+return result;
+}
+
+/**
+ * Handle a new member generic group join.
+ *
+ * @param context The request context.
+ * @param request The join group request.
+ * @param group   The group to add the member.
+ * @param joinReason  The client reason for the join request.
+ * @param responseFuture  The response future to complete.
+ *
+ * @return The coordinator result that will be appended to the log.
+ */
+private CoordinatorResult, Record> 
genericGroupJoinNewMember(
+RequestContext context,
+JoinGroupRequestData request,
+GenericGroup group,
+String joinReason,
+CompletableFuture responseFuture
+) {
+List protocols = new ArrayList<>();
+request.protocols().forEach(protocol -> protocols.add(new 
Protocol(protocol.name(), protocol.metadata(;
+if (group.isInState(DEAD)) {
+// if the group is marked as dead, it means some other thread has 
just removed the group
+// from the coordinator metadata; it is likely that the group has 
migrated to some other
+// coordinator OR the group is in a transient unstable phase. Let 
the member retry
+// finding the correct coordinator and rejoin.
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+

[GitHub] [kafka] jolshan commented on a diff in pull request #13876: KAFKA-10733: Clean up producer exceptions

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13876:
URL: https://github.com/apache/kafka/pull/13876#discussion_r1235809528


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1013,16 +1013,8 @@ private void maybeFailWithError() {
 if (!hasError()) {
 return;
 }
-// for ProducerFencedException, do not wrap it as a KafkaException

Review Comment:
   Did we ever get the rationale for why this decision was made? It seems to 
suggest here that the call stack will be incorrect so we would rather have an 
empty one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-20 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1235808438


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1045,1265 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+// Below stores all methods to handle generic group APIs.
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> 
genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < groupMinSessionTimeoutMs ||
+sessionTimeoutMs > groupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = (GenericGroup) getOrMaybeCreateGroup(groupId, GENERIC, 
isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+String joinReason = request.reason();
+if (joinReason == null || joinReason.isEmpty()) {
+joinReason = "not provided";
+}
+
+if (!acceptJoiningMember(group, memberId)) {
+group.remove(memberId);
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+.setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code())
+);
+
+} else if (isUnknownMember) {
+result = genericGroupJoinNewMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+} else {
+result = genericGroupJoinExistingMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+}
+
+// Attempt to complete join group phase. We do not complete
+// the join group phase if this is the initial rebalance.
+if (group.isInState(PREPARING_REBALANCE) &&
+group.hasAllMembersJoined() &&
+group.generationId() != 0
+) {
+completeGenericGroupJoin(group);
+}
+}
+
+return result;
+}
+
+/**
+ * Handle a new member generic group join.
+ *
+ * @param context The request context.
+ * @param request The join group request.
+ * @param group   The group to add the member.
+ * @param joinReason  The client reason for the join request.
+ * @param responseFuture  The response future to complete.
+ *
+ * @return The coordinator result that will be appended to the log.
+ */
+private CoordinatorResult, Record> 
genericGroupJoinNewMember(
+RequestContext context,
+JoinGroupRequestData request,
+GenericGroup group,
+String joinReason,
+CompletableFuture responseFuture
+) {
+List protocols = new ArrayList<>();
+request.protocols().forEach(protocol -> protocols.add(new 
Protocol(protocol.name(), protocol.metadata(;
+if (group.isInState(DEAD)) {
+// if the group is marked as dead, it means some other thread has 
just removed the group
+// from the coordinator metadata; it is likely that the group has 
migrated to some other
+// coordinator OR the group is in a transient unstable phase. Let 
the member retry
+// finding the correct coordinator and rejoin.
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13868: MINOR: Close ReplicaManager correctly in ReplicaManagerTest

2023-06-20 Thread via GitHub


divijvaidya commented on code in PR #13868:
URL: https://github.com/apache/kafka/pull/13868#discussion_r1235807347


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -75,31 +75,31 @@ import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
 import org.mockito.ArgumentMatchers.{any, anyInt, anyMap, anySet, anyString}
-import org.mockito.Mockito.{doReturn, mock, mockConstruction, never, reset, 
times, verify, verifyNoMoreInteractions, when}
+import org.mockito.Mockito.{doReturn, mock, mockConstruction, never, reset, 
spy, times, verify, verifyNoMoreInteractions, when}
 
 import scala.collection.{Map, Seq, mutable}
 import scala.compat.java8.OptionConverters.RichOptionForJava8
 import scala.jdk.CollectionConverters._
 
 class ReplicaManagerTest {
 
-  val topic = "test-topic"
-  val topicId = Uuid.randomUuid()
-  val topicIds = scala.Predef.Map("test-topic" -> topicId)
-  val topicNames = scala.Predef.Map(topicId -> "test-topic")
-  val time = new MockTime
-  val scheduler = new MockScheduler(time)
-  val metrics = new Metrics
-  var alterPartitionManager: AlterPartitionManager = _
-  var config: KafkaConfig = _
-  var quotaManager: QuotaManagers = _
-  var mockRemoteLogManager: RemoteLogManager = _
+  private val topic = "test-topic"
+  private val topicId: Uuid = Uuid.randomUuid()

Review Comment:
   Removed the annotation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13868: MINOR: Close ReplicaManager correctly in ReplicaManagerTest

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13868:
URL: https://github.com/apache/kafka/pull/13868#discussion_r1235803368


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -75,31 +75,31 @@ import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
 import org.mockito.ArgumentMatchers.{any, anyInt, anyMap, anySet, anyString}
-import org.mockito.Mockito.{doReturn, mock, mockConstruction, never, reset, 
times, verify, verifyNoMoreInteractions, when}
+import org.mockito.Mockito.{doReturn, mock, mockConstruction, never, reset, 
spy, times, verify, verifyNoMoreInteractions, when}
 
 import scala.collection.{Map, Seq, mutable}
 import scala.compat.java8.OptionConverters.RichOptionForJava8
 import scala.jdk.CollectionConverters._
 
 class ReplicaManagerTest {
 
-  val topic = "test-topic"
-  val topicId = Uuid.randomUuid()
-  val topicIds = scala.Predef.Map("test-topic" -> topicId)
-  val topicNames = scala.Predef.Map(topicId -> "test-topic")
-  val time = new MockTime
-  val scheduler = new MockScheduler(time)
-  val metrics = new Metrics
-  var alterPartitionManager: AlterPartitionManager = _
-  var config: KafkaConfig = _
-  var quotaManager: QuotaManagers = _
-  var mockRemoteLogManager: RemoteLogManager = _
+  private val topic = "test-topic"
+  private val topicId: Uuid = Uuid.randomUuid()

Review Comment:
   do we want to keep annotation now that it's private  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13885: MINOR: Fix typos for connect

2023-06-20 Thread via GitHub


C0urante merged PR #13885:
URL: https://github.com/apache/kafka/pull/13885


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1235772593


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1235773175


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-20 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1235752587


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1045,1265 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+// Below stores all methods to handle generic group APIs.
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> 
genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < groupMinSessionTimeoutMs ||
+sessionTimeoutMs > groupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = (GenericGroup) getOrMaybeCreateGroup(groupId, GENERIC, 
isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+String joinReason = request.reason();
+if (joinReason == null || joinReason.isEmpty()) {
+joinReason = "not provided";
+}
+
+if (!acceptJoiningMember(group, memberId)) {
+group.remove(memberId);
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+.setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code())
+);
+
+} else if (isUnknownMember) {
+result = genericGroupJoinNewMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+} else {
+result = genericGroupJoinExistingMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+}
+
+// Attempt to complete join group phase. We do not complete
+// the join group phase if this is the initial rebalance.
+if (group.isInState(PREPARING_REBALANCE) &&
+group.hasAllMembersJoined() &&
+group.generationId() != 0
+) {
+completeGenericGroupJoin(group);
+}
+}
+
+return result;
+}
+
+/**
+ * Handle a new member generic group join.
+ *
+ * @param context The request context.
+ * @param request The join group request.
+ * @param group   The group to add the member.
+ * @param joinReason  The client reason for the join request.
+ * @param responseFuture  The response future to complete.
+ *
+ * @return The coordinator result that will be appended to the log.
+ */
+private CoordinatorResult, Record> 
genericGroupJoinNewMember(
+RequestContext context,
+JoinGroupRequestData request,
+GenericGroup group,
+String joinReason,
+CompletableFuture responseFuture
+) {
+List protocols = new ArrayList<>();
+request.protocols().forEach(protocol -> protocols.add(new 
Protocol(protocol.name(), protocol.metadata(;
+if (group.isInState(DEAD)) {
+// if the group is marked as dead, it means some other thread has 
just removed the group
+// from the coordinator metadata; it is likely that the group has 
migrated to some other
+// coordinator OR the group is in a transient unstable phase. Let 
the member retry
+// finding the correct coordinator and rejoin.
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-20 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1235733198


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1045,1265 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+// Below stores all methods to handle generic group APIs.
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> 
genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < groupMinSessionTimeoutMs ||
+sessionTimeoutMs > groupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = (GenericGroup) getOrMaybeCreateGroup(groupId, GENERIC, 
isUnknownMember);

Review Comment:
   i don't think this fits well because we may have other records to append in 
this method. As mentioned above, I will store the generic groups in a separate 
hash map to prevent this from happening.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-20 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1235734010


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1045,1265 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+// Below stores all methods to handle generic group APIs.
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> 
genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;

Review Comment:
   see https://github.com/apache/kafka/pull/13870#discussion_r1235653740



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-20 Thread via GitHub


dajac commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1235696555


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] dajac commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-20 Thread via GitHub


dajac commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1235694270


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] dajac commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-20 Thread via GitHub


dajac commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1235693242


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] mumrah commented on pull request #13890: KAFKA-15109 Don't skip leader epoch bump while in migration mode

2023-06-20 Thread via GitHub


mumrah commented on PR #13890:
URL: https://github.com/apache/kafka/pull/13890#issuecomment-1599310423

   This system test was failing on trunk, with this patch it's passing again.
   
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.3
   session_id:   2023-06-20--002
   run time: 5 minutes 48.107 seconds
   tests run:2
   passed:   2
   flaky:0
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=False
   status: PASS
   run time:   2 minutes 51.979 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=True
   status: PASS
   run time:   2 minutes 55.931 seconds
   

   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-20 Thread via GitHub


divijvaidya commented on PR #13850:
URL: https://github.com/apache/kafka/pull/13850#issuecomment-1599308163

   Thank you @showuon  @satishd for your review so far. I have added another 
commit to de-flake some tests, improved thread safety for `Entry`, added new 
unit tests and added log statements that help in debugging. The flakiness was 
caused due to how the tests were written earlier (overriding class members with 
spied variables).
   
   Requesting one last review cycle from you folks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-20 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1235653740


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -171,70 +260,152 @@ GroupMetadataManager build() {
 /**
  * The maximum number of members allowed in a single consumer group.
  */
-private final int consumerGroupMaxSize;
+private final int groupMaxSize;
 
 /**
  * The heartbeat interval for consumer groups.
  */
 private final int consumerGroupHeartbeatIntervalMs;
 
 /**
- * The topics metadata (or image).
+ * The metadata image.
+ */
+private MetadataImage metadataImage;
+
+// Rest of the fields are used for the generic group APIs.
+
+/**
+ * An empty result returned to the state machine. This means that
+ * there are no records to append to the log.
+ *
+ * Package private for testing.
+ */
+static final CoordinatorResult, Record> 
EMPTY_RESULT =
+new CoordinatorResult<>(Collections.emptyList(), 
CompletableFuture.completedFuture(null));
+
+/**
+ * Initial rebalance delay for members joining a generic group.
+ */
+private final int initialRebalanceDelayMs;
+
+/**
+ * The timeout used to wait for a new member in milliseconds.
+ */
+private final int newMemberJoinTimeoutMs;
+
+/**
+ * The group minimum session timeout.
+ */
+private final int groupMinSessionTimeoutMs;
+
+/**
+ * The group maximum session timeout.
+ */
+private final int groupMaxSessionTimeoutMs;
+
+/**
+ * The timer to add and cancel group operations.
  */
-private TopicsImage topicsImage;
+private final Timer, Record> timer;
+
+/**
+ * The time.
+ */
+private final Time time;
 
 private GroupMetadataManager(
 SnapshotRegistry snapshotRegistry,
 LogContext logContext,
 List assignors,
-TopicsImage topicsImage,
-int consumerGroupMaxSize,
-int consumerGroupHeartbeatIntervalMs
+MetadataImage metadataImage,
+TopicPartition topicPartition,
+int groupMaxSize,
+int consumerGroupHeartbeatIntervalMs,
+int initialRebalanceDelayMs,
+int newMemberJoinTimeoutMs,
+int groupMinSessionTimeoutMs,
+int groupMaxSessionTimeoutMs,
+Timer, Record> timer,
+Time time
 ) {
+this.logContext = logContext;
 this.log = logContext.logger(GroupMetadataManager.class);
 this.snapshotRegistry = snapshotRegistry;
-this.topicsImage = topicsImage;
+this.metadataImage = metadataImage;
 this.assignors = 
assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, 
Function.identity()));
+this.topicPartition = topicPartition;
 this.defaultAssignor = assignors.get(0);
 this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
-this.consumerGroupMaxSize = consumerGroupMaxSize;
+this.groupMaxSize = groupMaxSize;
 this.consumerGroupHeartbeatIntervalMs = 
consumerGroupHeartbeatIntervalMs;
+this.initialRebalanceDelayMs = initialRebalanceDelayMs;
+this.newMemberJoinTimeoutMs = newMemberJoinTimeoutMs;
+this.groupMinSessionTimeoutMs = groupMinSessionTimeoutMs;
+this.groupMaxSessionTimeoutMs = groupMaxSessionTimeoutMs;
+this.timer = timer;
+this.time = time;
+}
+
+/**
+ * When a new metadata image is pushed.
+ *
+ * @param metadataImage The new metadata image.
+ */
+public void onNewMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
 }
 
 /**
  * Gets or maybe creates a consumer group.
  *
  * @param groupId   The group id.
+ * @param groupType The group type (generic or consumer).
  * @param createIfNotExists A boolean indicating whether the group should 
be
  *  created if it does not exist.
  *
  * @return A ConsumerGroup.
+ * @throws InvalidGroupIdException  if the group id is invalid.
  * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
  *  if the group is not a consumer group.
  *
  * Package private for testing.
  */
-ConsumerGroup getOrMaybeCreateConsumerGroup(
+// Package private for testing.
+Group getOrMaybeCreateGroup(
 String groupId,
+Group.GroupType groupType,
 boolean createIfNotExists
-) throws GroupIdNotFoundException {
+) throws InvalidGroupIdException, GroupIdNotFoundException {
+if (groupId == null || groupId.isEmpty()) {
+throw new InvalidGroupIdException(String.format("Group id %s is 
invalid.", groupId));
+}

Review Comment:
   i removed this; we have request validation in 

[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1235651106


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1235650169


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1235640808


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1235639793


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] ijuma merged pull request #13840: MINOR: Upgrade Scala for Java 20/21 support

2023-06-20 Thread via GitHub


ijuma merged PR #13840:
URL: https://github.com/apache/kafka/pull/13840


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13840: MINOR: Upgrade Scala for Java 20/21 support

2023-06-20 Thread via GitHub


ijuma commented on PR #13840:
URL: https://github.com/apache/kafka/pull/13840#issuecomment-1599220710

   JDK 11 build passed, there were two unrelated failures outside of that:
   
   > Build / JDK 17 and Scala 2.13 / testMaxConnectionsPerIp() – 
kafka.network.SocketServerTest
   15s
   Build / JDK 8 and Scala 2.12 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15109) ISR shrink/expand issues on ZK brokers during migration

2023-06-20 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15109:
-
Affects Version/s: 3.6.0
   (was: 3.5.0)

> ISR shrink/expand issues on ZK brokers during migration
> ---
>
> Key: KAFKA-15109
> URL: https://issues.apache.org/jira/browse/KAFKA-15109
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, replication
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Priority: Critical
>
> KAFKA-15021 introduced a new controller behavior that avoids increasing the 
> leader epoch during the controlled shutdown scenario. This prevents some 
> unnecessary thrashing of metadata and threads on the brokers and clients. 
> While a cluster is in a KIP-866 migration and has a KRaft controller with ZK 
> brokers, we cannot employ this leader epoch bump avoidance. The ZK brokers 
> must have the leader epoch bump in order for ReplicaManager to react to the 
> LeaderAndIsrRequest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clayburn commented on pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-06-20 Thread via GitHub


clayburn commented on PR #13676:
URL: https://github.com/apache/kafka/pull/13676#issuecomment-1599144163

   @ijuma - good question. I do not have an answer myself, but I can get in 
touch with those involved in the agreement between ASF and Gradle and get back 
to you on that case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13878: MINOR: Move RocksDBTimeOrderedKeyValueBufferTest to use Junit5

2023-06-20 Thread via GitHub


wcarlson5 commented on code in PR #13878:
URL: https://github.com/apache/kafka/pull/13878#discussion_r1235507430


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -32,31 +32,28 @@
 import org.apache.kafka.test.MockInternalNewProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicInteger;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@RunWith(MockitoJUnitRunner.StrictStubs.class)

Review Comment:
   It shouldn't be necessary, it already fails unused stubs 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13878: MINOR: Move RocksDBTimeOrderedKeyValueBufferTest to use Junit5

2023-06-20 Thread via GitHub


wcarlson5 commented on code in PR #13878:
URL: https://github.com/apache/kafka/pull/13878#discussion_r1235506750


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -32,31 +32,28 @@
 import org.apache.kafka.test.MockInternalNewProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicInteger;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class RocksDBTimeOrderedKeyValueBufferTest {
 public RocksDBTimeOrderedKeyValueBuffer buffer;
-@Mock

Review Comment:
   It wasn't working, but the mock() was



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15109) ISR shrink/expand issues on ZK brokers during migration

2023-06-20 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15109:
-
Summary: ISR shrink/expand issues on ZK brokers during migration  (was: ISR 
not expanding on ZK brokers during migration)

> ISR shrink/expand issues on ZK brokers during migration
> ---
>
> Key: KAFKA-15109
> URL: https://issues.apache.org/jira/browse/KAFKA-15109
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, replication
>Affects Versions: 3.5.0
>Reporter: David Arthur
>Priority: Critical
>
> KAFKA-15021 introduced a new controller behavior that avoids increasing the 
> leader epoch during the controlled shutdown scenario. This prevents some 
> unnecessary thrashing of metadata and threads on the brokers and clients. 
> While a cluster is in a KIP-866 migration and has a KRaft controller with ZK 
> brokers, we cannot employ this leader epoch bump avoidance. The ZK brokers 
> must have the leader epoch bump in order for ReplicaManager to react to the 
> LeaderAndIsrRequest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15109) ISR not expanding on ZK brokers during migration

2023-06-20 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15109:
-
Description: 
KAFKA-15021 introduced a new controller behavior that avoids increasing the 
leader epoch during the controlled shutdown scenario. This prevents some 
unnecessary thrashing of metadata and threads on the brokers and clients. 

While a cluster is in a KIP-866 migration and has a KRaft controller with ZK 
brokers, we cannot employ this leader epoch bump avoidance. The ZK brokers must 
have the leader epoch bump in order for ReplicaManager to react to the 
LeaderAndIsrRequest.

  was:
KAFKA-15021 introduced a new controller behavior that avoids increasing the 
leader epoch during the controlled shutdown scenario. This prevents some 
unnecessary thrashing of metadata and threads on the brokers and clients. 

While a cluster is in a KIP-866 migration and has a KRaft controller with ZK 
brokers, we cannot employ this leader epoch bump avoidance. The ZK brokers must 
have the leader epoch bump in order for the ISR expansion to complete.


> ISR not expanding on ZK brokers during migration
> 
>
> Key: KAFKA-15109
> URL: https://issues.apache.org/jira/browse/KAFKA-15109
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, replication
>Affects Versions: 3.5.0
>Reporter: David Arthur
>Priority: Critical
>
> KAFKA-15021 introduced a new controller behavior that avoids increasing the 
> leader epoch during the controlled shutdown scenario. This prevents some 
> unnecessary thrashing of metadata and threads on the brokers and clients. 
> While a cluster is in a KIP-866 migration and has a KRaft controller with ZK 
> brokers, we cannot employ this leader epoch bump avoidance. The ZK brokers 
> must have the leader epoch bump in order for ReplicaManager to react to the 
> LeaderAndIsrRequest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-20 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1235497178


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -171,70 +260,152 @@ GroupMetadataManager build() {
 /**
  * The maximum number of members allowed in a single consumer group.
  */
-private final int consumerGroupMaxSize;
+private final int groupMaxSize;
 
 /**
  * The heartbeat interval for consumer groups.
  */
 private final int consumerGroupHeartbeatIntervalMs;
 
 /**
- * The topics metadata (or image).
+ * The metadata image.
+ */
+private MetadataImage metadataImage;
+
+// Rest of the fields are used for the generic group APIs.
+
+/**
+ * An empty result returned to the state machine. This means that
+ * there are no records to append to the log.
+ *
+ * Package private for testing.
+ */
+static final CoordinatorResult, Record> 
EMPTY_RESULT =
+new CoordinatorResult<>(Collections.emptyList(), 
CompletableFuture.completedFuture(null));
+
+/**
+ * Initial rebalance delay for members joining a generic group.
+ */
+private final int initialRebalanceDelayMs;
+
+/**
+ * The timeout used to wait for a new member in milliseconds.
+ */
+private final int newMemberJoinTimeoutMs;
+
+/**
+ * The group minimum session timeout.
+ */
+private final int groupMinSessionTimeoutMs;
+
+/**
+ * The group maximum session timeout.
+ */
+private final int groupMaxSessionTimeoutMs;
+
+/**
+ * The timer to add and cancel group operations.
  */
-private TopicsImage topicsImage;
+private final Timer, Record> timer;
+
+/**
+ * The time.
+ */
+private final Time time;
 
 private GroupMetadataManager(
 SnapshotRegistry snapshotRegistry,
 LogContext logContext,
 List assignors,
-TopicsImage topicsImage,
-int consumerGroupMaxSize,
-int consumerGroupHeartbeatIntervalMs
+MetadataImage metadataImage,
+TopicPartition topicPartition,
+int groupMaxSize,
+int consumerGroupHeartbeatIntervalMs,
+int initialRebalanceDelayMs,
+int newMemberJoinTimeoutMs,
+int groupMinSessionTimeoutMs,
+int groupMaxSessionTimeoutMs,
+Timer, Record> timer,
+Time time
 ) {
+this.logContext = logContext;
 this.log = logContext.logger(GroupMetadataManager.class);
 this.snapshotRegistry = snapshotRegistry;
-this.topicsImage = topicsImage;
+this.metadataImage = metadataImage;
 this.assignors = 
assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, 
Function.identity()));
+this.topicPartition = topicPartition;
 this.defaultAssignor = assignors.get(0);
 this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
-this.consumerGroupMaxSize = consumerGroupMaxSize;
+this.groupMaxSize = groupMaxSize;
 this.consumerGroupHeartbeatIntervalMs = 
consumerGroupHeartbeatIntervalMs;
+this.initialRebalanceDelayMs = initialRebalanceDelayMs;
+this.newMemberJoinTimeoutMs = newMemberJoinTimeoutMs;
+this.groupMinSessionTimeoutMs = groupMinSessionTimeoutMs;
+this.groupMaxSessionTimeoutMs = groupMaxSessionTimeoutMs;
+this.timer = timer;
+this.time = time;
+}
+
+/**
+ * When a new metadata image is pushed.
+ *
+ * @param metadataImage The new metadata image.
+ */
+public void onNewMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
 }
 
 /**
  * Gets or maybe creates a consumer group.
  *
  * @param groupId   The group id.
+ * @param groupType The group type (generic or consumer).
  * @param createIfNotExists A boolean indicating whether the group should 
be
  *  created if it does not exist.
  *
  * @return A ConsumerGroup.
+ * @throws InvalidGroupIdException  if the group id is invalid.
  * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
  *  if the group is not a consumer group.
  *
  * Package private for testing.
  */
-ConsumerGroup getOrMaybeCreateConsumerGroup(
+// Package private for testing.
+Group getOrMaybeCreateGroup(

Review Comment:
   i think we should store generic groups separately. the added benefit here is 
that we wouldn't have to create a new record when a new group was created as 
you have mentioned in the comment below. wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[jira] [Created] (KAFKA-15109) ISR not expanding on ZK brokers during migration

2023-06-20 Thread David Arthur (Jira)
David Arthur created KAFKA-15109:


 Summary: ISR not expanding on ZK brokers during migration
 Key: KAFKA-15109
 URL: https://issues.apache.org/jira/browse/KAFKA-15109
 Project: Kafka
  Issue Type: Bug
  Components: kraft, replication
Affects Versions: 3.5.0
Reporter: David Arthur


KAFKA-15021 introduced a new controller behavior that avoids increasing the 
leader epoch during the controlled shutdown scenario. This prevents some 
unnecessary thrashing of metadata and threads on the brokers and clients. 

While a cluster is in a KIP-866 migration and has a KRaft controller with ZK 
brokers, we cannot employ this leader epoch bump avoidance. The ZK brokers must 
have the leader epoch bump in order for the ISR expansion to complete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1

2023-06-20 Thread via GitHub


mimaison commented on PR #13260:
URL: https://github.com/apache/kafka/pull/13260#issuecomment-1599080971

   I've not setup the system tests recently. If the provided instructions are 
not clear enough, let's discuss it on the dev mailing list. You can also open 
Jiras for the issues you encounter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-06-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735344#comment-17735344
 ] 

ASF GitHub Bot commented on KAFKA-14995:


mimaison commented on PR #521:
URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1599054544

   Let's sort out https://github.com/apache/kafka/pull/13842 first but this 
will need to be updated to include our new committer.




> Automate asf.yaml collaborators refresh
> ---
>
> Key: KAFKA-14995
> URL: https://issues.apache.org/jira/browse/KAFKA-14995
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: Steven Booke
>Priority: Minor
>  Labels: newbie
>
> We have added a policy to use the asf.yaml Github Collaborators: 
> [https://github.com/apache/kafka-site/pull/510]
> The policy states that we set this list to be the top 20 commit authors who 
> are not Kafka committers. Unfortunately, it's not trivial to compute this 
> list.
> Here is the process I followed to generate the list the first time (note that 
> I generated this list on 2023-04-28, so the lookback is one year:
> 1. List authors by commit volume in the last year:
> {code:java}
> $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
> 2. manually filter out the authors who are committers, based on 
> [https://kafka.apache.org/committers]
> 3. truncate the list to 20 authors
> 4. for each author
> 4a. Find a commit in the `git log` that they were the author on:
> {code:java}
> commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
> Author: hudeqi <1217150...@qq.com>
> Date:   Fri May 12 14:03:17 2023 +0800
> ...{code}
> 4b. Look up that commit in Github: 
> [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]
> 4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
> the Collaborators lists.
> 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]
>  
> This is pretty time consuming and is very scriptable. Two complications:
>  * To do the filtering, we need to map from Git log "Author" to documented 
> Kafka "Committer" that we can use to perform the filter. Suggestion: just 
> update the structure of the "Committers" page to include their Git "Author" 
> name and email 
> ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
>  * To generate the YAML lists, we need to map from Git log "Author" to Github 
> username. There's presumably some way to do this in the Github REST API (the 
> mapping is based on the email, IIUC), or we could also just update the 
> Committers page to also document each committer's Github username.
>  
> Ideally, we would write this script (to be stored in the Apache Kafka repo) 
> and create a Github Action to run it every three months.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on a diff in pull request #13842: KAFKA-14995: Automate asf.yaml collaborators refresh

2023-06-20 Thread via GitHub


mimaison commented on code in PR #13842:
URL: https://github.com/apache/kafka/pull/13842#discussion_r1235448777


##
refresh-collaborators.py:
##
@@ -0,0 +1,44 @@
+import os

Review Comment:
   We need to have the Apache license at the top of the file.



##
refresh-collaborators.py:
##
@@ -0,0 +1,44 @@
+import os
+from bs4 import BeautifulSoup
+from github import Github
+import yaml
+
+### GET THE NAMES OF THE KAFKA COMMITTERS FROM THE apache/kafka-site REPO ###
+github_token = os.environ.get('GITHUB_TOKEN')
+g = Github(github_token)
+repo = g.get_repo("apache/kafka-site")
+contents = repo.get_contents("committers.html")
+content = contents.decoded_content
+soup = BeautifulSoup(content, "html.parser")
+committer_logins = [login.text for login in soup.find_all('div', 
class_='github_login')]
+
+### GET THE CONTRIBUTORS OF THE apache/kafka REPO ###
+n = 10
+repo = g.get_repo("apache/kafka")
+contributors = repo.get_contributors()
+collaborators = []
+for contributor in contributors:
+if contributor.login not in committer_logins:
+collaborators += [contributor.login]
+refreshed_collaborators = collaborators[:n]
+
+### UPDATE asf.yaml ###
+file_path = ".asf.yaml"
+file = repo.get_contents(file_path)
+yaml_content = yaml.safe_load(file.decoded_content)
+
+# Update 'github_whitelist' list
+github_whitelist = refreshed_collaborators[:10]  # New users to be added
+yaml_content["jenkins"]["github_whitelist"] = github_whitelist
+
+# Update 'collaborators' list
+collaborators = refreshed_collaborators[:10]  # New collaborators to be added
+yaml_content["github"]["collaborators"] = collaborators
+
+# Convert the updated content back to YAML
+updated_yaml = yaml.safe_dump(yaml_content)
+
+# Commit and push the changes
+commit_message = "Update .asf.yaml file with refreshed github_whitelist, and 
collaborators"

Review Comment:
   We tend to prefix commit with either a Jira or `MINOR`. Maybe we can use 
`MINOR:` here



##
.github/workflows/refresh-collaborators.yaml:
##
@@ -0,0 +1,24 @@
+name: Refresh asf.yaml collaborators every 3 months

Review Comment:
   We need to have the Apache license at the top of the file.



##
refresh-collaborators.py:
##
@@ -0,0 +1,44 @@
+import os
+from bs4 import BeautifulSoup
+from github import Github
+import yaml
+
+### GET THE NAMES OF THE KAFKA COMMITTERS FROM THE apache/kafka-site REPO ###
+github_token = os.environ.get('GITHUB_TOKEN')
+g = Github(github_token)
+repo = g.get_repo("apache/kafka-site")
+contents = repo.get_contents("committers.html")
+content = contents.decoded_content
+soup = BeautifulSoup(content, "html.parser")
+committer_logins = [login.text for login in soup.find_all('div', 
class_='github_login')]
+
+### GET THE CONTRIBUTORS OF THE apache/kafka REPO ###
+n = 10
+repo = g.get_repo("apache/kafka")
+contributors = repo.get_contributors()
+collaborators = []
+for contributor in contributors:
+if contributor.login not in committer_logins:
+collaborators += [contributor.login]
+refreshed_collaborators = collaborators[:n]
+
+### UPDATE asf.yaml ###
+file_path = ".asf.yaml"
+file = repo.get_contents(file_path)
+yaml_content = yaml.safe_load(file.decoded_content)
+
+# Update 'github_whitelist' list
+github_whitelist = refreshed_collaborators[:10]  # New users to be added

Review Comment:
   Isn't the length of `refreshed_collaborators` already 10? It's been assigned 
`collaborators[:n]` where `n` is 10.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13885: MINOR: Fix typos for connect

2023-06-20 Thread via GitHub


C0urante commented on PR #13885:
URL: https://github.com/apache/kafka/pull/13885#issuecomment-1599050843

   Thanks! Will merge pending CI build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush commented on a diff in pull request #13885: MINOR: Fix typos for connect

2023-06-20 Thread via GitHub


xiaocairush commented on code in PR #13885:
URL: https://github.com/apache/kafka/pull/13885#discussion_r1235460018


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -325,20 +325,20 @@ public boolean awaitShutdown(long timeoutMs) {
 }
 
 public void transitionTo(TargetState targetState, Callback 
stateChangeCallback) {
-Callback preEmptedStateChangeCallback;
-TargetState preEmptedState;
+Callback preEmptiedStateChangeCallback;

Review Comment:
   resolved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush commented on a diff in pull request #13885: MINOR: Fix typos for connect

2023-06-20 Thread via GitHub


xiaocairush commented on code in PR #13885:
URL: https://github.com/apache/kafka/pull/13885#discussion_r1235458225


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -271,12 +271,12 @@ public synchronized void shutdown() {
 
 void doShutdown() {
 try {
-TargetState preEmptedState = 
pendingTargetStateChange.getAndSet(null);
+TargetState preEmptiedState = 
pendingTargetStateChange.getAndSet(null);

Review Comment:
   Have reverted all false positive fix in WorkerConnector



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush commented on a diff in pull request #13885: MINOR: Fix typos for connect

2023-06-20 Thread via GitHub


xiaocairush commented on code in PR #13885:
URL: https://github.com/apache/kafka/pull/13885#discussion_r1235458225


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -271,12 +271,12 @@ public synchronized void shutdown() {
 
 void doShutdown() {
 try {
-TargetState preEmptedState = 
pendingTargetStateChange.getAndSet(null);
+TargetState preEmptiedState = 
pendingTargetStateChange.getAndSet(null);

Review Comment:
   revert all false positive fix in WorkerConnector



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush commented on a diff in pull request #13885: MINOR: Fix typos for connect

2023-06-20 Thread via GitHub


xiaocairush commented on code in PR #13885:
URL: https://github.com/apache/kafka/pull/13885#discussion_r1235455670


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -325,20 +325,20 @@ public boolean awaitShutdown(long timeoutMs) {
 }
 
 public void transitionTo(TargetState targetState, Callback 
stateChangeCallback) {
-Callback preEmptedStateChangeCallback;
-TargetState preEmptedState;
+Callback preEmptiedStateChangeCallback;
+TargetState preEmptiedState;

Review Comment:
   resolved



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -325,20 +325,20 @@ public boolean awaitShutdown(long timeoutMs) {
 }
 
 public void transitionTo(TargetState targetState, Callback 
stateChangeCallback) {
-Callback preEmptedStateChangeCallback;
-TargetState preEmptedState;
+Callback preEmptiedStateChangeCallback;
+TargetState preEmptiedState;
 synchronized (this) {
-preEmptedStateChangeCallback = 
pendingStateChangeCallback.getAndSet(stateChangeCallback);
-preEmptedState = pendingTargetStateChange.getAndSet(targetState);
+preEmptiedStateChangeCallback = 
pendingStateChangeCallback.getAndSet(stateChangeCallback);
+preEmptiedState = pendingTargetStateChange.getAndSet(targetState);
 notify();
 }
-if (preEmptedStateChangeCallback != null) {
-preEmptedStateChangeCallback.onCompletion(
+if (preEmptiedStateChangeCallback != null) {
+preEmptiedStateChangeCallback.onCompletion(
 new ConnectException(
-"Could not begin changing connector state to " + 
preEmptedState.name()
+"Could not begin changing connector state to " + 
preEmptiedState.name()
 + " before another request to change state was 
made;"
 + " the new request (which is to change the 
state to " + targetState.name()
-+ ") has pre-empted this one"),
++ ") has pre-emptied this one"),

Review Comment:
   resolved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13447: MINOR: Change ordering of checks to prevent log spam on metadata updates

2023-06-20 Thread via GitHub


jolshan commented on code in PR #13447:
URL: https://github.com/apache/kafka/pull/13447#discussion_r1235450231


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -399,8 +399,13 @@ private Optional 
updateLatestMetadata(
 // Between the time that a topic is deleted and re-created, 
the client may lose track of the
 // corresponding topicId (i.e. `oldTopicId` will be null). In 
this case, when we discover the new
 // topicId, we allow the corresponding leader epoch to 
override the last seen value.
-log.info("Resetting the last seen epoch of partition {} to {} 
since the associated topicId changed from {} to {}",
- tp, newEpoch, oldTopicId, topicId);
+if (oldTopicId != null) {
+log.info("Resetting the last seen epoch of partition {} to 
{} since the associated topicId changed from {} to {}",
+tp, newEpoch, oldTopicId, topicId);
+} else {
+log.debug("Resetting the last seen epoch of partition {} 
to {} since the associated topicId was undefined but is now set to {}",

Review Comment:
   The change should remove the logging on startup. Which is where the main 
noise was found (ie, null to having a topic ID)
   
   Are you still seeing that log message? If so which version are you running 
on?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-06-20 Thread via GitHub


ijuma commented on PR #13676:
URL: https://github.com/apache/kafka/pull/13676#issuecomment-1599025907

   As long as there are no restrictions due to this proprietary plugin, I am ok 
with it. That is, others (including several companies) run these builds too. Is 
there some official comms from Apache with regards to this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13885: MINOR: Fix typos for connect

2023-06-20 Thread via GitHub


C0urante commented on code in PR #13885:
URL: https://github.com/apache/kafka/pull/13885#discussion_r1235437562


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -325,20 +325,20 @@ public boolean awaitShutdown(long timeoutMs) {
 }
 
 public void transitionTo(TargetState targetState, Callback 
stateChangeCallback) {
-Callback preEmptedStateChangeCallback;
-TargetState preEmptedState;
+Callback preEmptiedStateChangeCallback;
+TargetState preEmptiedState;

Review Comment:
   Also incorrect



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -325,20 +325,20 @@ public boolean awaitShutdown(long timeoutMs) {
 }
 
 public void transitionTo(TargetState targetState, Callback 
stateChangeCallback) {
-Callback preEmptedStateChangeCallback;
-TargetState preEmptedState;
+Callback preEmptiedStateChangeCallback;

Review Comment:
   Also incorrect



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -325,20 +325,20 @@ public boolean awaitShutdown(long timeoutMs) {
 }
 
 public void transitionTo(TargetState targetState, Callback 
stateChangeCallback) {
-Callback preEmptedStateChangeCallback;
-TargetState preEmptedState;
+Callback preEmptiedStateChangeCallback;
+TargetState preEmptiedState;
 synchronized (this) {
-preEmptedStateChangeCallback = 
pendingStateChangeCallback.getAndSet(stateChangeCallback);
-preEmptedState = pendingTargetStateChange.getAndSet(targetState);
+preEmptiedStateChangeCallback = 
pendingStateChangeCallback.getAndSet(stateChangeCallback);
+preEmptiedState = pendingTargetStateChange.getAndSet(targetState);
 notify();
 }
-if (preEmptedStateChangeCallback != null) {
-preEmptedStateChangeCallback.onCompletion(
+if (preEmptiedStateChangeCallback != null) {
+preEmptiedStateChangeCallback.onCompletion(
 new ConnectException(
-"Could not begin changing connector state to " + 
preEmptedState.name()
+"Could not begin changing connector state to " + 
preEmptiedState.name()
 + " before another request to change state was 
made;"
 + " the new request (which is to change the 
state to " + targetState.name()
-+ ") has pre-empted this one"),
++ ") has pre-emptied this one"),

Review Comment:
   Also incorrect



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -271,12 +271,12 @@ public synchronized void shutdown() {
 
 void doShutdown() {
 try {
-TargetState preEmptedState = 
pendingTargetStateChange.getAndSet(null);
+TargetState preEmptiedState = 
pendingTargetStateChange.getAndSet(null);

Review Comment:
   This change is incorrect; "pre-empted" is the intended term.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13845: KAFKA-15078; KRaft leader replys with snapshot for offset 0

2023-06-20 Thread via GitHub


jsancio commented on code in PR #13845:
URL: https://github.com/apache/kafka/pull/13845#discussion_r1235409277


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1017,7 +1017,16 @@ private FetchResponseData tryCompleteFetchRequest(
 long fetchOffset = request.fetchOffset();
 int lastFetchedEpoch = request.lastFetchedEpoch();
 LeaderState state = quorum.leaderStateOrThrow();
-ValidOffsetAndEpoch validOffsetAndEpoch = 
log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+Optional latestSnapshotId = log.latestSnapshotId();
+final ValidOffsetAndEpoch validOffsetAndEpoch;
+if (fetchOffset == 0 && latestSnapshotId.isPresent()) {

Review Comment:
   Thanks @dengziming 
   
   We had a similar conversation in another PR: 
https://github.com/apache/kafka/pull/13834#discussion_r1224779841
   
   In short it is not clear to me that these improvements (implementation 
complexities) are a big win for the cluster metadata partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15075) MM2 internal checkpoints topic should support multiple partitions

2023-06-20 Thread Elkhan Eminov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735327#comment-17735327
 ] 

Elkhan Eminov commented on KAFKA-15075:
---

[~durban] hi there, is this up for grabs or are you planning to work on this?

> MM2 internal checkpoints topic should support multiple partitions
> -
>
> Key: KAFKA-15075
> URL: https://issues.apache.org/jira/browse/KAFKA-15075
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Priority: Major
>
> Currently, the internal checkpoints topic of MM2 uses a single partition.
> This is an unnecessary limitation, and instead, it should support more 
> partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15087) Move InterBrokerSendThread to server-commons module

2023-06-20 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15087.
-
Fix Version/s: 3.6.0
 Reviewer: David Jacot
   Resolution: Fixed

> Move InterBrokerSendThread to server-commons module
> ---
>
> Key: KAFKA-15087
> URL: https://issues.apache.org/jira/browse/KAFKA-15087
> Project: Kafka
>  Issue Type: Task
>Reporter: Dimitar Dimitrov
>Assignee: Dimitar Dimitrov
>Priority: Major
> Fix For: 3.6.0
>
>
> Similar to the move of {{ShutdownableThread}} done with KAFKA-14706.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac merged pull request #13856: KAFKA-15087 Move/rewrite InterBrokerSendThread to server-commons

2023-06-20 Thread via GitHub


dajac merged PR #13856:
URL: https://github.com/apache/kafka/pull/13856


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush opened a new pull request, #13889: MINOR: Fix typos for meatadata

2023-06-20 Thread via GitHub


xiaocairush opened a new pull request, #13889:
URL: https://github.com/apache/kafka/pull/13889

   *More detailed description of your change,
   Fix typos for meatadata*
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-06-20 Thread via GitHub


mimaison commented on PR #13676:
URL: https://github.com/apache/kafka/pull/13676#issuecomment-1598926367

   Thanks, this is a useful improvement. I was not aware the ASF had a GE 
instance. The changes look good to me. 
   
   @ijuma any concerns?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush commented on pull request #13888: MINOR: Fix typos for streams

2023-06-20 Thread via GitHub


xiaocairush commented on PR #13888:
URL: https://github.com/apache/kafka/pull/13888#issuecomment-1598872442

   > nice catches!
   
   Thanks, there are still little typos in other module. I will commit in the 
future after these PR is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage

2023-06-20 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735299#comment-17735299
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-7739:
-

Thanks [~showuon]. My impression was that there's some work already planned for 
this. Will wait for [~satish.duggana] feedback to confirm this. Cheers!

> Kafka Tiered Storage
> 
>
> Key: KAFKA-7739
> URL: https://issues.apache.org/jira/browse/KAFKA-7739
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Harsha
>Assignee: Satish Duggana
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.6.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14953) Add metrics for tiered storage

2023-06-20 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735298#comment-17735298
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-14953:
--

[~showuon] I pressed add to soon, I got confused with the PR referred on the 
ticket description that is merged already, but it's only a reference.

Anyway, thanks [~abhijeetkumar] for the update!

> Add metrics for tiered storage
> --
>
> Key: KAFKA-14953
> URL: https://issues.apache.org/jira/browse/KAFKA-14953
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Abhijeet Kumar
>Priority: Minor
>
> Not just for expired fetch. We also need to add all the metrics described in 
> KIP-405
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-NewMetrics
>  
> ref: [https://github.com/apache/kafka/pull/13535#discussion_r1180286031] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] xiaocairush opened a new pull request, #13888: MINOR: Fix typos for streams

2023-06-20 Thread via GitHub


xiaocairush opened a new pull request, #13888:
URL: https://github.com/apache/kafka/pull/13888

   *More detailed description of your change,
   Fix typos for streams*
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush opened a new pull request, #13887: MINOR: Fix typos for server common

2023-06-20 Thread via GitHub


xiaocairush opened a new pull request, #13887:
URL: https://github.com/apache/kafka/pull/13887

   *More detailed description of your change,
   Fix typos for server common*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

2023-06-20 Thread via GitHub


machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1598817722

   > @machi1990 , could you explain more about this:
   > 
   > > The test neither calls commitSync, nor commitAsync which means that the 
cache is never updated in [2] after initially set in [1].
   > 
   > If it don't commit anything, then the expected committed value should be 
0, right? And if the expected value are all greater than 0, there should be 
somewhere doing offset commit, right? It could be auto commit in the consumer 
side. (maybe?)
   
   Sorry @showuon , I should have clarified: There is no manual sync/async 
commit of offsets.  
   The offset are all greater than 0. The consumer doesn't do auto commit as 
can be seen here in the initialization 
https://github.com/apache/kafka/blob/9539c559a782aba8ce95c9b8b48831c6879821d2/core/src/test/scala/integration/kafka/api/TransactionsTest.scala#L774
 
   offsets commits are happening somewhere else and that's via the 
`producer.sendOffsetsToTransaction(..)` e.g 
https://github.com/apache/kafka/blob/9539c559a782aba8ce95c9b8b48831c6879821d2/core/src/test/scala/integration/kafka/api/TransactionsTest.scala#L247
   
   I didn't think of this earlier on and it changes a few thing, making me 
think that I might need to revisit the caching logic for committed offsets e.g 
re-use `consumer#position(tp)`.
   
   I am keen to know what you think? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush opened a new pull request, #13886: MINOR: Fix typos for group coordinator

2023-06-20 Thread via GitHub


xiaocairush opened a new pull request, #13886:
URL: https://github.com/apache/kafka/pull/13886

   *More detailed description of your change,
   Fix typos for group coordinator.*
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush opened a new pull request, #13885: MINOR: Fix typos for connect

2023-06-20 Thread via GitHub


xiaocairush opened a new pull request, #13885:
URL: https://github.com/apache/kafka/pull/13885

   *More detailed description of your change,
   Fix typos for connect*
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush opened a new pull request, #13884: MINOR: fix typos for client

2023-06-20 Thread via GitHub


xiaocairush opened a new pull request, #13884:
URL: https://github.com/apache/kafka/pull/13884

   *More detailed description of your change,
   
   Fix typos for client
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14953) Add metrics for tiered storage

2023-06-20 Thread Abhijeet Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735279#comment-17735279
 ] 

Abhijeet Kumar commented on KAFKA-14953:


I am working on the unit tests and the PR should be available soon.

> Add metrics for tiered storage
> --
>
> Key: KAFKA-14953
> URL: https://issues.apache.org/jira/browse/KAFKA-14953
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Abhijeet Kumar
>Priority: Minor
>
> Not just for expired fetch. We also need to add all the metrics described in 
> KIP-405
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-NewMetrics
>  
> ref: [https://github.com/apache/kafka/pull/13535#discussion_r1180286031] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

2023-06-20 Thread via GitHub


showuon commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1598673535

   @machi1990 , could you explain more about this:
   > The test neither calls commitSync, nor commitAsync which means that the 
cache is never updated in [2] after initially set in [1].
   
   If it don't commit anything, then the expected committed value should be 0, 
right? And if the expected value are all greater than 0, there should be 
somewhere doing offset commit, right? It could be auto commit in the consumer 
side. (maybe?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] drawxy commented on a diff in pull request #13847: KAFKA-15082: The log retention policy doesn't take effect after altering log dir

2023-06-20 Thread via GitHub


drawxy commented on code in PR #13847:
URL: https://github.com/apache/kafka/pull/13847#discussion_r1235172202


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1808,7 +1809,10 @@ class ReplicaManager(val config: KafkaConfig,
 
   // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
   // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
+  replicaAlterLogDirsManager.getFetcher(topicPartition) match {

Review Comment:
   In general, we should pause cleaning only once during altering log dir. 
Because there is a counter (kafka.log.LogCleaningPaused) for how many times the 
cleaning of topic partition has been paused and the fetcher only subtracts the 
counter by one after altering completed. And the cleaning will only be resumed 
when the counter is reduced to 0. But currently every time broker recives a 
LeaderAndISR request about the parition being altering log dir, it would call 
the abortAndPauseCleaning. Therefore, during altering log dir, a LeaderAndISR 
about that partition would disable the log cleaning of that partition forever.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

2023-06-20 Thread via GitHub


machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1598653771

   > @showuon I was looking onto this and after several local runs, I managed 
to eliminate some flasky test and came up with the list of failures that are 
only caused by this change. The failure total number of failures that I've seen 
locally are:
   > 
   > ```
   > kafka.api.TransactionsBounceTest.testWithGroupId()
   > kafka.api.TransactionsBounceTest.testWithGroupMetadata()
   > kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String)[1]
   > kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String)[2]
   > 
kafka.api.TransactionsTest.testSendOffsetsWithGroupMetadata(String)[1]
   > 
kafka.api.TransactionsTest.testSendOffsetsWithGroupMetadata(String)[2]
   > 
kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[1]
   > 
kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[2]
   > 
kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[3]
   > 
kafka.server.DynamicConfigChangeUnitTest.testIpHandlerUnresolvableAddress()
   > 
kafka.zk.ZkMigrationIntegrationTest.testNewAndChangedTopicsInDualWrite(ClusterInstance)[1]
   > kafka.admin.ConfigCommandTest.shouldFailIfInvalidHost()
   > ```
   > 
   > And only the `TransactionsTest` and `TransactionsBounceTest` are the ones 
that I've identified to be related to this PR. I've started to investigate 
these it so far my conclusion is that the failure there are related to reading 
of stale cache values because the cache item is stored only once when fetching 
the offset in [1]. The test neither calls `commitSync`, nor `commitAsync` which 
means that the cache is never updated in [2] after initially set in [1].
   > 
   > I was thinking of dropping off cache update when fetching committed 
offsets i.e in [1] and only perform cache update when during offset commit [2]
   > 
   > 1. 
https://github.com/apache/kafka/blob/76d25c94e2c8723eec31a3df64c752bc66c79b34/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1075
   > 2. 
https://github.com/apache/kafka/blob/76d25c94e2c8723eec31a3df64c752bc66c79b34/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1456
   > 
   > That would align to the comment you raised in [#13665 
(comment)](https://github.com/apache/kafka/pull/13665#discussion_r1197621674) 
Let me know what you think.
   
   I've pushed this change in 
https://github.com/apache/kafka/pull/13665/commits/9539c559a782aba8ce95c9b8b48831c6879821d2
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lukestephenson-zendesk commented on a diff in pull request #13447: MINOR: Change ordering of checks to prevent log spam on metadata updates

2023-06-20 Thread via GitHub


lukestephenson-zendesk commented on code in PR #13447:
URL: https://github.com/apache/kafka/pull/13447#discussion_r1235166042


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -399,8 +399,13 @@ private Optional 
updateLatestMetadata(
 // Between the time that a topic is deleted and re-created, 
the client may lose track of the
 // corresponding topicId (i.e. `oldTopicId` will be null). In 
this case, when we discover the new
 // topicId, we allow the corresponding leader epoch to 
override the last seen value.
-log.info("Resetting the last seen epoch of partition {} to {} 
since the associated topicId changed from {} to {}",
- tp, newEpoch, oldTopicId, topicId);
+if (oldTopicId != null) {
+log.info("Resetting the last seen epoch of partition {} to 
{} since the associated topicId changed from {} to {}",
+tp, newEpoch, oldTopicId, topicId);
+} else {
+log.debug("Resetting the last seen epoch of partition {} 
to {} since the associated topicId was undefined but is now set to {}",

Review Comment:
   @jolshan Why did you end up discarding this change? I'm seeing lots of noise 
from my Kafka producers because this is still at info level. The next commit 
[Reorganize 
logic](https://github.com/apache/kafka/pull/13447/commits/8c7549cece628e12654cb636b31283a3ba2d45fe)
 which from the commit description I wasn't expecting any functional changes, 
but it got rid of this change to log levels. Was that intended?  If this was 
logged at debug level, it would reduce a lot of noise for my producers.  Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush commented on pull request #13883: MINOR: Fix typos for doc

2023-06-20 Thread via GitHub


xiaocairush commented on PR #13883:
URL: https://github.com/apache/kafka/pull/13883#issuecomment-1598643673

   sorry for that commit so many changes. Please review 
https://github.com/apache/kafka/pull/13882 first. When that PR is merged then 
the change will be only in doc module.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush opened a new pull request, #13883: MINOR: Fix typos for doc

2023-06-20 Thread via GitHub


xiaocairush opened a new pull request, #13883:
URL: https://github.com/apache/kafka/pull/13883

   *More detailed description of your change,
   
   Fix some typos  for documents
   
   *Summary of testing strategy (including rationale)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >