Re: [PR] MINOR: Fix GroupCoordinatorShardTest stubbing [kafka]

2023-11-14 Thread via GitHub


dajac merged PR #14637:
URL: https://github.com/apache/kafka/pull/14637


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15653) NPE in ChunkedByteStream

2023-11-14 Thread Henry Cai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786206#comment-17786206
 ] 

Henry Cai commented on KAFKA-15653:
---

Is this Jira resolved?  I saw all the three linked PRs are merged in already.

> NPE in ChunkedByteStream
> 
>
> Key: KAFKA-15653
> URL: https://issues.apache.org/jira/browse/KAFKA-15653
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
> Environment: Docker container on a Linux laptop, using the latest 
> release.
>Reporter: Travis Bischel
>Assignee: Justine Olshan
>Priority: Major
> Attachments: repro.sh
>
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>   at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>   at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>   at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874)
>   at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15809) Adding TS to broker's features list and updating broker's metadata schema to include TS enable status

2023-11-14 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786204#comment-17786204
 ] 

Phuc Hong Tran commented on KAFKA-15809:


Nvm I got the answer 

> Adding TS to broker's features list and updating broker's metadata schema to 
> include TS enable status
> -
>
> Key: KAFKA-15809
> URL: https://issues.apache.org/jira/browse/KAFKA-15809
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Phuc Hong Tran
>Assignee: Phuc Hong Tran
>Priority: Minor
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> Currently controller doesn't have the visibility of all brokers's TS enable 
> status. As mentioned in KAFKA-15341, we need to add metadata about TS enable 
> status of brokers so that controller can check for these status before 
> enabling TS per topic



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15470: Allow creating connectors in a stopped state [kafka]

2023-11-14 Thread via GitHub


yashmayya merged PR #14704:
URL: https://github.com/apache/kafka/pull/14704


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15470: Allow creating connectors in a stopped state [kafka]

2023-11-14 Thread via GitHub


yashmayya commented on PR #14704:
URL: https://github.com/apache/kafka/pull/14704#issuecomment-1811862334

   Thanks Chris! Test failures appear unrelated, merging to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15470: Allow creating connectors in a stopped state [kafka]

2023-11-14 Thread via GitHub


yashmayya commented on code in PR #14704:
URL: https://github.com/apache/kafka/pull/14704#discussion_r1393673421


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##
@@ -108,6 +108,19 @@ public interface Herder {
  */
 void putConnectorConfig(String connName, Map config, 
boolean allowReplace, Callback> callback);
 
+/**
+ * Set the configuration for a connector, along with a target state 
optionally. This supports creation and updating.
+ * @param connName name of the connector
+ * @param config the connector's configuration
+ * @param targetState the desired target state for the connector; may be 
{@code null} if no target state change is desired. Note that the default
+ *target state is {@link TargetState#STARTED} if no 
target state exists previously
+ * @param allowReplace if true, allow overwriting previous configs; if 
false, throw {@link AlreadyExistsException}
+ * if a connector with the same name already exists
+ * @param callback callback to invoke when the configuration has been 
written
+ */
+void putConnectorConfig(String connName, Map config, 
TargetState targetState, boolean allowReplace,

Review Comment:
   I agree, that makes sense to me  



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##
@@ -108,6 +108,19 @@ public interface Herder {
  */
 void putConnectorConfig(String connName, Map config, 
boolean allowReplace, Callback> callback);
 
+/**
+ * Set the configuration for a connector, along with a target state 
optionally. This supports creation and updating.
+ * @param connName name of the connector
+ * @param config the connector's configuration
+ * @param targetState the desired target state for the connector; may be 
{@code null} if no target state change is desired. Note that the default
+ *target state is {@link TargetState#STARTED} if no 
target state exists previously
+ * @param allowReplace if true, allow overwriting previous configs; if 
false, throw {@link AlreadyExistsException}
+ * if a connector with the same name already exists
+ * @param callback callback to invoke when the configuration has been 
written
+ */
+void putConnectorConfig(String connName, Map config, 
TargetState targetState, boolean allowReplace,

Review Comment:
   I agree, that makes sense to me  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15445: Add JVM Docker image [kafka]

2023-11-14 Thread via GitHub


VedarthConfluent commented on code in PR #14552:
URL: https://github.com/apache/kafka/pull/14552#discussion_r1393649057


##
docker/docker_release.py:
##
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Python script to build and push docker image
+Usage: docker_release.py
+
+Interactive utility to push the docker image to dockerhub
+"""
+
+import subprocess
+from distutils.dir_util import copy_tree
+from datetime import date
+import shutil
+
+def push_jvm(image, kafka_url):
+copy_tree("resources", "jvm/resources")
+subprocess.run(["docker", "buildx", "build", "-f", "jvm/Dockerfile", 
"--build-arg", f"kafka_url={kafka_url}", "--build-arg", 
f"build_date={date.today()}",
+"--push",
+"--platform", "linux/amd64,linux/arm64",
+"--tag", image, "jvm"])
+shutil.rmtree("jvm/resources")
+
+def login():
+status = subprocess.run(["docker", "login"])
+if status.returncode != 0:
+print("Docker login failed, aborting the docker release")
+raise PermissionError
+
+def create_builder():
+subprocess.run(["docker", "buildx", "create", "--name", "kafka-builder", 
"--use"])
+
+def remove_builder():
+subprocess.run(["docker", "buildx", "rm", "kafka-builder"])
+
+if __name__ == "__main__":

Review Comment:
   We have decided to keep the release and promotion process to be driven by 
local scripts for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-14 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1393640277


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1355,6 +1402,60 @@ class KafkaApis(val requestChannel: RequestChannel,
   ))
   }
 
+  def handleDescribeTopicsRequest(request: RequestChannel.Request): Unit = {
+val describeTopicsRequest = request.body[DescribeTopicsRequest]
+
+val topics = scala.collection.mutable.Map[String, Int]()
+describeTopicsRequest.data.topics.forEach { topic =>
+  if (topic.name == null || topic.firstPartitionId() < 0) {
+throw new InvalidRequestException(s"Topic name and first partition id 
must be set.")
+  }
+  topics.put(topic.name(), topic.firstPartitionId())
+}
+
+val fetchAllTopics = topics.isEmpty

Review Comment:
   If there is no topic in the request, it means the client wants all the 
topics.
   firstPartitionIndex is a topic level argument. It only works in the case 
where topics are provided.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-14 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1393639171


##
clients/src/main/resources/common/message/DescribeTopicsRequest.json:
##
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 73,
+  "type": "request",
+  "listeners": ["broker"],
+  "name": "DescribeTopicsRequest",

Review Comment:
   Updated the name. Thanks for the advice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-14 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1393638945


##
clients/src/main/resources/common/message/DescribeTopicsResponse.json:
##
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 73,
+  "type": "response",
+  "name": "DescribeTopicsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
"ignorable": true,
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "Topics", "type": "[]DescribeTopicsResponseTopic", "versions": 
"0+",
+  "about": "Each topic in the response.", "fields": [
+  { "name": "ErrorCode", "type": "int16", "versions": "0+",
+"about": "The topic error, or 0 if there was no error." },
+  { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "topicName", "nullableVersions": "0+",
+"about": "The topic name." },
+  { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": 
true, "about": "The topic id." },
+  { "name": "IsInternal", "type": "bool", "versions": "0+", "default": 
"false", "ignorable": true,
+"about": "True if the topic is internal." },
+  { "name": "Partitions", "type": "[]DescribeTopicsResponsePartition", 
"versions": "0+",
+"about": "Each partition in the topic.", "fields": [
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The partition error, or 0 if there was no error." },
+{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
+  "about": "The partition index." },
+{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+  "about": "The ID of the leader broker." },
+{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1", "ignorable": true,
+  "about": "The leader epoch of this partition." },
+{ "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", 
"entityType": "brokerId",
+  "about": "The set of all nodes that host this partition." },
+{ "name": "IsrNodes", "type": "[]int32", "versions": "0+", 
"entityType": "brokerId",
+  "about": "The set of nodes that are in sync with the leader for this 
partition." },
+{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": 
"null", "entityType": "brokerId",
+  "versions": "0+", "nullableVersions": "0+",
+  "about": "The new eligible leader replicas otherwise." },
+{ "name": "LastKnownELR", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
+  "versions": "0+", "nullableVersions": "0+",
+  "about": "The last known ELR." },
+{ "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", 
"ignorable": true, "entityType": "brokerId",
+  "about": "The set of offline replicas of this partition." }]},
+  { "name": "TopicAuthorizedOperations", "type": "int32", "versions": 
"0+", "default": "-2147483648",
+"about": "32-bit bitfield to represent authorized operations for this 
topic." },
+  { "name": "NextPartition", "type": "int32", "versions": "0+", "default": 
"-1",
+"about": "The first partition that exceed the request limit. " }]}

Review Comment:
   Correct, it should be empty in most of the cases. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-14 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1393636842


##
clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicsRequest.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeTopicsRequestData;
+import org.apache.kafka.common.message.DescribeTopicsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class DescribeTopicsRequest extends AbstractRequest {
+public static class Builder extends 
AbstractRequest.Builder {
+private final DescribeTopicsRequestData data;
+
+public Builder(DescribeTopicsRequestData data) {
+super(ApiKeys.DESCRIBE_TOPICS);
+this.data = data;
+}
+
+public Builder(List topics) {
+super(ApiKeys.DESCRIBE_TOPICS, 
ApiKeys.DESCRIBE_TOPICS.oldestVersion(), 
ApiKeys.DESCRIBE_TOPICS.latestVersion());
+DescribeTopicsRequestData data = new DescribeTopicsRequestData();
+topics.forEach(topicName -> data.topics().add(new 
DescribeTopicsRequestData.TopicRequest().setName(topicName)));
+this.data = data;
+}
+
+@Override
+public DescribeTopicsRequest build(short version) {
+return new DescribeTopicsRequest(data, version);
+}
+
+@Override
+public String toString() {
+return data.toString();
+}
+
+}
+
+private final DescribeTopicsRequestData data;
+
+public DescribeTopicsRequest(DescribeTopicsRequestData data) {
+super(ApiKeys.DESCRIBE_TOPICS, (short) 0);
+this.data = data;
+}
+
+public DescribeTopicsRequest(DescribeTopicsRequestData data, short 
version) {
+super(ApiKeys.DESCRIBE_TOPICS, version);
+this.data = data;
+}
+
+@Override
+public DescribeTopicsRequestData data() {
+return data;
+}
+
+@Override
+public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+Errors error = Errors.forException(e);
+DescribeTopicsResponseData responseData = new 
DescribeTopicsResponseData();
+for (DescribeTopicsRequestData.TopicRequest topic : data.topics()) {

Review Comment:
   Can you help elaborate here? what kind of top-level error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-14 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1393636378


##
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java:
##
@@ -386,7 +387,8 @@ public enum Errors {
 STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry 
after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", 
StaleMemberEpochException::new),
 MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the 
wrong type.", MismatchedEndpointTypeException::new),
 UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", 
UnsupportedEndpointTypeException::new),
-UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new);
+UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new),
+REQUEST_LIMIT_REACHED(117, "The request has reached the limit.", 
RequestLimitReachedException::new);

Review Comment:
   Yes. Updated the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-14 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1393634656


##
clients/src/main/java/org/apache/kafka/common/errors/RequestLimitReachedException.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class RequestLimitReachedException extends ApiException {

Review Comment:
   Yes. Comments added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-14 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1393633978


##
clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java:
##
@@ -114,6 +114,7 @@ public enum ApiKeys {
 CONSUMER_GROUP_HEARTBEAT(ApiMessageType.CONSUMER_GROUP_HEARTBEAT),
 CONSUMER_GROUP_DESCRIBE(ApiMessageType.CONSUMER_GROUP_DESCRIBE),
 CONTROLLER_REGISTRATION(ApiMessageType.CONTROLLER_REGISTRATION),
+DESCRIBE_TOPICS(ApiMessageType.DESCRIBE_TOPICS),

Review Comment:
   Sorry, it is misaligned in what way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-14 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1393633762


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -116,6 +117,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
   val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", 
config)
   val configManager = new ConfigAdminManager(brokerId, config, 
configRepository)
+  val partitionRequestLimit = 2000

Review Comment:
   Sure, added the config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] WIP: initial simple and incomplete implementation [kafka]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #14144:
URL: https://github.com/apache/kafka/pull/14144#issuecomment-1811755757

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DRAFT] EXAMPLE: dual write recipe example for KIP-939 [kafka]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #14231:
URL: https://github.com/apache/kafka/pull/14231#issuecomment-1811755708

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15829) How to build Kafka 2.7 with maven instead of gradle?

2023-11-14 Thread rain.liang (Jira)
rain.liang created KAFKA-15829:
--

 Summary: How to build Kafka 2.7 with maven instead of gradle?
 Key: KAFKA-15829
 URL: https://issues.apache.org/jira/browse/KAFKA-15829
 Project: Kafka
  Issue Type: Wish
Affects Versions: 2.7.2
Reporter: rain.liang


It's difficult to upgrade the version of gradle in kafka building. Is there a 
solution to build kafka 2.7 with maven?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]

2023-11-14 Thread via GitHub


mjsax commented on PR #14596:
URL: https://github.com/apache/kafka/pull/14596#issuecomment-1811669805

   Jenkins did crash. Re-started the build. Can merge after we got a green run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on PR #14680:
URL: https://github.com/apache/kafka/pull/14680#issuecomment-1811620894

   @lucasbru As @philipnee said, the two mechanisms appear similar and do have 
overlap. However, not only is it "awkward" to mix them, but it raises 
correctness questions.
   
   `poll()` processes  _all_ of the events in the queue from the background 
thread. However, when the user calls `commit`*, we want to execute only the 
commit callbacks. As a result, we would have to _filter and remove_ the 
relevant events from the background thread.
   
   The questions that arose:
   
   * What if, when processing the commit callbacks, we see error events in the 
queue? Do we skip them? Execute them? Or something else?
   * Will removing the events from the background event queue cause unforeseen 
issues later when calling `poll()`?
   
   The answer may be that it's totally fine. We kind of "punted" by doing it 
this way, not only because of those questions, but because it seemed a bit 
awkward 路‍♂️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-14 Thread via GitHub


hachikuji commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1393479827


##
clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicsRequest.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeTopicsRequestData;
+import org.apache.kafka.common.message.DescribeTopicsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class DescribeTopicsRequest extends AbstractRequest {
+public static class Builder extends 
AbstractRequest.Builder {
+private final DescribeTopicsRequestData data;
+
+public Builder(DescribeTopicsRequestData data) {
+super(ApiKeys.DESCRIBE_TOPICS);
+this.data = data;
+}
+
+public Builder(List topics) {
+super(ApiKeys.DESCRIBE_TOPICS, 
ApiKeys.DESCRIBE_TOPICS.oldestVersion(), 
ApiKeys.DESCRIBE_TOPICS.latestVersion());
+DescribeTopicsRequestData data = new DescribeTopicsRequestData();
+topics.forEach(topicName -> data.topics().add(new 
DescribeTopicsRequestData.TopicRequest().setName(topicName)));
+this.data = data;
+}
+
+@Override
+public DescribeTopicsRequest build(short version) {
+return new DescribeTopicsRequest(data, version);
+}
+
+@Override
+public String toString() {
+return data.toString();
+}
+
+}
+
+private final DescribeTopicsRequestData data;
+
+public DescribeTopicsRequest(DescribeTopicsRequestData data) {
+super(ApiKeys.DESCRIBE_TOPICS, (short) 0);
+this.data = data;
+}
+
+public DescribeTopicsRequest(DescribeTopicsRequestData data, short 
version) {
+super(ApiKeys.DESCRIBE_TOPICS, version);
+this.data = data;
+}
+
+@Override
+public DescribeTopicsRequestData data() {
+return data;
+}
+
+@Override
+public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+Errors error = Errors.forException(e);
+DescribeTopicsResponseData responseData = new 
DescribeTopicsResponseData();
+for (DescribeTopicsRequestData.TopicRequest topic : data.topics()) {

Review Comment:
   Perhaps this is telling us that we need a top-level error in the response.



##
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java:
##
@@ -386,7 +387,8 @@ public enum Errors {
 STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry 
after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", 
StaleMemberEpochException::new),
 MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the 
wrong type.", MismatchedEndpointTypeException::new),
 UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", 
UnsupportedEndpointTypeException::new),
-UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new);
+UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new),
+REQUEST_LIMIT_REACHED(117, "The request has reached the limit.", 
RequestLimitReachedException::new);

Review Comment:
   The phrasing is very vague. I guess it is a request _size_ limit?



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1355,6 +1402,60 @@ class KafkaApis(val requestChannel: RequestChannel,
   ))
   }
 
+  def handleDescribeTopicsRequest(request: RequestChannel.Request): Unit = {
+val describeTopicsRequest = request.body[DescribeTopicsRequest]
+
+val topics = scala.collection.mutable.Map[String, Int]()
+describeTopicsRequest.data.topics.forEach { topic =>
+  if (topic.name == null || topic.firstPartitionId() < 0) {
+throw new InvalidRequestException(s"Topic name and first partition id 
must be set.")
+  }
+  

[jira] [Updated] (KAFKA-15828) Protect clients from broker hostname reuse

2023-11-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15828:

Labels: needs-kip  (was: )

> Protect clients from broker hostname reuse
> --
>
> Key: KAFKA-15828
> URL: https://issues.apache.org/jira/browse/KAFKA-15828
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, producer 
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
>
> In some environments such as k8s, brokers may be assigned to nodes 
> dynamically from an available pool. When a cluster is rolling, it is possible 
> for the client to see the same node advertised for different broker IDs in a 
> short period of time. For example, kafka-1 might be initially assigned to 
> node1. Before the client is able to establish a connection, it could be that 
> kafka-3 is now on node1 instead. Currently there is no protection in the 
> client or in the protocol for this scenario. If the connection succeeds, the 
> client will assume it has a good connection to kafka-1. Until something 
> disrupts the connection, it will continue under this assumption even if the 
> hostname for kafka-1 changes.
> We have observed this scenario in practice. The client connected to the wrong 
> broker through stale hostname information. It was unable to produce data 
> because of persistent NOT_LEADER errors. The only way to recover in the end 
> was by restarting the client to force a reconnection.
> We have discussed a couple potential solutions to this problem:
>  # Let the client be smarter managing the connection/hostname mapping. When 
> it detects that a hostname has changed, it should force a disconnect to 
> ensure it connects to the right node.
>  # We can modify the protocol to verify that the client has connected to the 
> intended broker. For example, we can add a field to ApiVersions to indicate 
> the intended broker ID. The broker receiving the request can return an error 
> if its ID does not match that in the request.
> Are there alternatives? 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15828) Protect clients from broker hostname reuse

2023-11-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15828:

Component/s: clients
 consumer
 producer 

> Protect clients from broker hostname reuse
> --
>
> Key: KAFKA-15828
> URL: https://issues.apache.org/jira/browse/KAFKA-15828
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, producer 
>Reporter: Jason Gustafson
>Priority: Major
>
> In some environments such as k8s, brokers may be assigned to nodes 
> dynamically from an available pool. When a cluster is rolling, it is possible 
> for the client to see the same node advertised for different broker IDs in a 
> short period of time. For example, kafka-1 might be initially assigned to 
> node1. Before the client is able to establish a connection, it could be that 
> kafka-3 is now on node1 instead. Currently there is no protection in the 
> client or in the protocol for this scenario. If the connection succeeds, the 
> client will assume it has a good connection to kafka-1. Until something 
> disrupts the connection, it will continue under this assumption even if the 
> hostname for kafka-1 changes.
> We have observed this scenario in practice. The client connected to the wrong 
> broker through stale hostname information. It was unable to produce data 
> because of persistent NOT_LEADER errors. The only way to recover in the end 
> was by restarting the client to force a reconnection.
> We have discussed a couple potential solutions to this problem:
>  # Let the client be smarter managing the connection/hostname mapping. When 
> it detects that a hostname has changed, it should force a disconnect to 
> ensure it connects to the right node.
>  # We can modify the protocol to verify that the client has connected to the 
> intended broker. For example, we can add a field to ApiVersions to indicate 
> the intended broker ID. The broker receiving the request can return an error 
> if its ID does not match that in the request.
> Are there alternatives? 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1811599069

   @philipnee can you tag with `ctr` and `KIP-848` 梁 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15276) Implement event plumbing for ConsumerRebalanceListener callbacks

2023-11-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15276:
--
Summary: Implement event plumbing for ConsumerRebalanceListener callbacks  
(was: Implement partition assignment reconciliation callbacks)

> Implement event plumbing for ConsumerRebalanceListener callbacks
> 
>
> Key: KAFKA-15276
> URL: https://issues.apache.org/jira/browse/KAFKA-15276
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> Provide the Java client support for the consumer group partition assignment 
> logic, including:
>  * Calculate the difference between the current partition assignment and that 
> returned in the {{ConsumerGroupHeartbeatResponse}} RPC response
>  * Ensure we handle the case where changes to the assignment take multiple 
> passes of {{RequestManager.poll()}}
>  * Integrate the mechanism to invoke the user’s rebalance callback
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15276) Implement partition assignment reconciliation callbacks

2023-11-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15276:
--
Summary: Implement partition assignment reconciliation callbacks  (was: 
Complete partition assignment reconciliation callbacks)

> Implement partition assignment reconciliation callbacks
> ---
>
> Key: KAFKA-15276
> URL: https://issues.apache.org/jira/browse/KAFKA-15276
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> Provide the Java client support for the consumer group partition assignment 
> logic, including:
>  * Calculate the difference between the current partition assignment and that 
> returned in the {{ConsumerGroupHeartbeatResponse}} RPC response
>  * Ensure we handle the case where changes to the assignment take multiple 
> passes of {{RequestManager.poll()}}
>  * Integrate the mechanism to invoke the user’s rebalance callback
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Small LogValidator clean ups [kafka]

2023-11-14 Thread via GitHub


hachikuji commented on PR #14697:
URL: https://github.com/apache/kafka/pull/14697#issuecomment-1811585213

   @ex172000 @junrao Thanks for reviewing. Please take another look when you 
have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


jolshan merged PR #14753:
URL: https://github.com/apache/kafka/pull/14753


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-14 Thread via GitHub


kirktrue closed pull request #14670: KAFKA-15277: Design & implement support 
for internal Consumer delegates
URL: https://github.com/apache/kafka/pull/14670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1811579631

   Update on test failures:
   
   - `kafka.api.ClientIdQuotaTest`
 - `testQuotaOverrideDelete`: ran 50 times locally with no errors
 - `testThrottledProducerConsumer`: ran 75 times locally with no errors
   - `org.apache.kafka.connect.integration.StartAndStopLatchTest`
 - `shouldReturnFalseWhenAwaitingForStopToNeverComplete`: ran 400 times 
locally with no errors
   
   Closing and re-opening to trigger build...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


jolshan commented on PR #14753:
URL: https://github.com/apache/kafka/pull/14753#issuecomment-1811575702

   Given the scope of the change, these test failures are not related. Will 
followup elsewhere.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Small LogValidator clean ups [kafka]

2023-11-14 Thread via GitHub


hachikuji commented on code in PR #14697:
URL: https://github.com/apache/kafka/pull/14697#discussion_r1393453106


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -818,9 +818,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
 
appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs)
 appendInfo.setLastOffset(offset.value - 1)
-
appendInfo.setRecordConversionStats(validateAndOffsetAssignResult.recordConversionStats)
-if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
-  
appendInfo.setLogAppendTime(validateAndOffsetAssignResult.logAppendTimeMs)

Review Comment:
   Yes, this was a mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15824: SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet [kafka]

2023-11-14 Thread via GitHub


wcarlson5 merged PR #14757:
URL: https://github.com/apache/kafka/pull/14757


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15824: SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet [kafka]

2023-11-14 Thread via GitHub


wcarlson5 commented on PR #14757:
URL: https://github.com/apache/kafka/pull/14757#issuecomment-1811568924

   Execution failed for task ':streams:upgrade-system-tests-0101:test'.
   is unrelated to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Small LogValidator clean ups [kafka]

2023-11-14 Thread via GitHub


hachikuji commented on code in PR #14697:
URL: https://github.com/apache/kafka/pull/14697#discussion_r1393447344


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -399,12 +402,20 @@ class LogValidatorTest {
 assertEquals(i, offsetCounter.value);
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
-assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs,
+
+val expectedShallowOffsetOfMaxTimestamp = if (magic >= 
RecordVersion.V2.value) {

Review Comment:
   If the behavior changes, breaking the test seems correct? Very unlikely we 
would ever go back to a non-batching format in any case, so this seems like a 
reasonable default assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15816: Fix leaked sockets in runtime tests [kafka]

2023-11-14 Thread via GitHub


gharris1727 opened a new pull request, #14764:
URL: https://github.com/apache/kafka/pull/14764

   These tests contain typos that leak sockets, mostly through clients that are 
left open.
   
   The WorkerTest had rather involved resource leaks, which arose because the 
typical lifetime of the WorkerTask was partially or totally mocked. The 
WorkerTask subclass constructors accept clients as arguments, and take 
ownership of those clients, and the partial mocking prevented those clients 
from being closed appropriately.
   
   Now, all of (WorkerSourceTask, ExactlyOnceWorkerSourceTask, WorkerSinkTask) 
have their constructors mocked, and those constructors close the passed-in 
resources immediately. This also allows the test to avoid waiting for some 
cancellation timeouts to expire, making the test faster to run.
   
   The other systematic typo that was present was the OffsetStore not being 
configured or closed, which caused a consumer to be leaked. This is because the 
constructor is instantiated in the factory method (e.g. 
offsetStoreForExactlyOnceSourceTask), but the KafkaOffsetBackingStore.offsetLog 
field field is only initialized inside of configure(WorkerConfig). Rather than 
making OffsetBackingStore implementations close the consumer even when 
configure is not called, I made the test use a more realistic lifecycle and 
actually call configure(WorkerConfig) and stop().
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393368519


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 s"The current assignment is ${consumer.assignment()}")
   }
 
-  @Test
-  def testConsumingWithNullGroupId(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Null group ID only supported for 
generic group protocol
+  def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
 val topic = "test_topic"
-val partition = 0;
+val partition = 0

Review Comment:
   I missed one :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-14 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391737904


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,39 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));

Review Comment:
   This is an existing issue, but `getErrorResponse` can just be 
`errorResponse`.



##
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+private Uuid uuid;
+private ClientMetricsInstanceMetadata instanceMetadata;
+private ClientMetricsInstance clientInstance;
+
+@BeforeEach
+public void setUp() throws UnknownHostException {
+uuid = Uuid.randomUuid();
+instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());

Review Comment:
   merge with previous line?



##
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1811460391

   @dajac—here are the unique test failures with notes:
   
   - integration.kafka.server.FetchFromFollowerIntegrationTest: 
testRackAwareRangeAssignor(String).quorum=kraft
 - KAFKA-15020
   - integration.kafka.server.FetchFromFollowerIntegrationTest: 
testRackAwareRangeAssignor(String).quorum=zk
 - KAFKA-15020, same as above
   - kafka.api.ClientIdQuotaTest: testQuotaOverrideDelete(String).quorum=kraft
 - KAFKA-8107
 - **_Will investigate_**
   - kafka.api.ClientIdQuotaTest: 
testThrottledProducerConsumer(String).quorum=zk
 - KAFKA-8108
 - **_Will investigate_**
   - kafka.api.GroupEndToEndAuthorizationTest: 
testAuthentications(String).quorum=kraft
 - No Jira, but [it is flaky on `trunk` and PR 
builds](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=kafka.api.GroupEndToEndAuthorizationTest=testAuthentications(String)%5B2%5D)
  
   - kafka.api.TransactionsTest: testBumpTransactionalEpoch(String).quorum=kraft
 - KAFKA-15099
   - org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest: 
testMultiWorkerRestartOnlyConnector
 - KAFKA-15675
   - org.apache.kafka.connect.integration.StartAndStopLatchTest: 
shouldReturnFalseWhenAwaitingForStopToNeverComplete
 - **_Will investigate_**
   - 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest:
 testReplication()
 - No Jira, but [it is flaky on `trunk` and PR 
builds](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest=testReplication())
   - org.apache.kafka.controller.QuorumControllerTest: 
testBalancePartitionLeaders()
 - KAFKA-15052, marked as fixed
   - org.apache.kafka.controller.QuorumControllerTest: 
testFenceMultipleBrokers()
 - No Jira, but [it is flaky on `trunk` and PR 
builds](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=org.apache.kafka.controller.QuorumControllerTest=testFenceMultipleBrokers())
   - 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest:
 testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs()
 - No Jira, but [it is flaky on `trunk` and PR 
builds](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest=testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs())
   - org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest: 
shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2]
 - KAFKA-9868, marked as fixed
   - org.apache.kafka.streams.integration.IQv2StoreIntegrationTest; 
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_WINDOW, kind=DSL]
 - KAFKA-13714, marked as fixed
   - org.apache.kafka.streams.integration.RestoreIntegrationTest: 
shouldInvokeUserDefinedGlobalStateRestoreListener()
 - KAFKA-15659, marked as fixed
   - org.apache.kafka.tools.MetadataQuorumCommandTest: [1] Type=Raft-Combined, 
Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.7-IV1, 
Security=PLAINTEXT
 - KAFKA-15104
   - org.apache.kafka.trogdor.coordinator.CoordinatorTest: 
testTaskRequestWithOldStartMsGetsUpdated()
 - KAFKA-8115


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393368519


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 s"The current assignment is ${consumer.assignment()}")
   }
 
-  @Test
-  def testConsumingWithNullGroupId(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Null group ID only supported for 
generic group protocol
+  def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
 val topic = "test_topic"
-val partition = 0;
+val partition = 0

Review Comment:
   I missed one :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393368202


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 s"The current assignment is ${consumer.assignment()}")
   }
 
-  @Test
-  def testConsumingWithNullGroupId(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Null group ID only supported for 
generic group protocol
+  def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
 val topic = "test_topic"
-val partition = 0;
+val partition = 0
 val tp = new TopicPartition(topic, partition)
 createTopic(topic, 1, 1)
 
-TestUtils.waitUntilTrue(() => {
-  this.zkClient.topicExists(topic)
-}, "Failed to create topic")
-

Review Comment:
   I'm not convinced we ever did. Again, this is ZK-specific. The tests work on 
both variants without this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393367240


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -34,13 +35,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @Test
-  def testSimpleConsumption(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))

Review Comment:
   Perhaps, but I want to be able to have a different array for each test to 
enable them to be turned on individually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393365674


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1657,35 +1759,37 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(numMessages - records.count, 
lag.metricValue.asInstanceOf[Double], epsilon, s"The lag should be 
${numMessages - records.count}")
   }
 
-  @Test
-  def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testQuotaMetricsNotCreatedIfNoQuotasConfigured(groupProtocol: String): 
Unit = {
 val numRecords = 1000
 val producer = createProducer()
 val startingTimestamp = System.currentTimeMillis()
 sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 val consumer = createConsumer()
 consumer.assign(List(tp).asJava)
 consumer.seek(tp, 0)
 consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0, startingTimestamp = startingTimestamp)
 
-def assertNoMetric(broker: KafkaServer, name: String, quotaType: 
QuotaType, clientId: String): Unit = {
-val metricName = broker.metrics.metricName("throttle-time",
-  quotaType.toString,
-  "",
-  "user", "",
-  "client-id", clientId)
-assertNull(broker.metrics.metric(metricName), "Metric should not have 
been created " + metricName)
+def assertNoMetric(broker: KafkaBroker, name: String, quotaType: 
QuotaType, clientId: String): Unit = {
+  val metricName = broker.metrics.metricName("throttle-time",
+quotaType.toString,
+"",
+"user", "",
+"client-id", clientId)
+  assertNull(broker.metrics.metric(metricName), "Metric should not have 
been created " + metricName)
 }
-servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, 
producerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, 
producerClientId))
-servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, 
consumerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, 
consumerClientId))
 
-servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
producerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
producerClientId))
-servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
consumerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
consumerClientId))

Review Comment:
   In this context, `servers` and `brokers` are interchangeable. This change 
makes the test work for ZK or KRaft. Previously, it was ZK-only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15756: Migrate existing integration tests to run old protocol in new coordinator [kafka]

2023-11-14 Thread via GitHub


dongnuo123 commented on code in PR #14675:
URL: https://github.com/apache/kafka/pull/14675#discussion_r1393317417


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -547,7 +565,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(2, parts.size)
   }
 
-  @Test
+  @Test // TODO: doesn't pass for kraft and kraft+kip848

Review Comment:
   `consumer.partitionsFor("non-exist-topic")` is currently not supported in 
the new protocol



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15828) Protect clients from broker hostname reuse

2023-11-14 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-15828:
---

 Summary: Protect clients from broker hostname reuse
 Key: KAFKA-15828
 URL: https://issues.apache.org/jira/browse/KAFKA-15828
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


In some environments such as k8s, brokers may be assigned to nodes dynamically 
from an available pool. When a cluster is rolling, it is possible for the 
client to see the same node advertised for different broker IDs in a short 
period of time. For example, kafka-1 might be initially assigned to node1. 
Before the client is able to establish a connection, it could be that kafka-3 
is now on node1 instead. Currently there is no protection in the client or in 
the protocol for this scenario. If the connection succeeds, the client will 
assume it has a good connection to kafka-1. Until something disrupts the 
connection, it will continue under this assumption even if the hostname for 
kafka-1 changes.

We have observed this scenario in practice. The client connected to the wrong 
broker through stale hostname information. It was unable to produce data 
because of persistent NOT_LEADER errors. The only way to recover in the end was 
by restarting the client to force a reconnection.

We have discussed a couple potential solutions to this problem:
 # Let the client be smarter managing the connection/hostname mapping. When it 
detects that a hostname has changed, it should force a disconnect to ensure it 
connects to the right node.
 # We can modify the protocol to verify that the client has connected to the 
intended broker. For example, we can add a field to ApiVersions to indicate the 
intended broker ID. The broker receiving the request can return an error if its 
ID does not match that in the request.

Are there alternatives? 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15756: Migrate existing integration tests to run old protocol in new coordinator [kafka]

2023-11-14 Thread via GitHub


dongnuo123 commented on code in PR #14675:
URL: https://github.com/apache/kafka/pull/14675#discussion_r1393317417


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -547,7 +565,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(2, parts.size)
   }
 
-  @Test
+  @Test // TODO: doesn't pass for kraft and kraft+kip848

Review Comment:
   `consumer.partitionsFor("non-exist-topic")` is currently not supported in 
kraft



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393305110


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -34,13 +35,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @Test
-  def testSimpleConsumption(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))

Review Comment:
   Is it possible to have a different type of 'source' that can be defined once 
vs. on each test? More of a nit, but curious.



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 s"The current assignment is ${consumer.assignment()}")
   }
 
-  @Test
-  def testConsumingWithNullGroupId(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Null group ID only supported for 
generic group protocol
+  def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
 val topic = "test_topic"
-val partition = 0;
+val partition = 0
 val tp = new TopicPartition(topic, partition)
 createTopic(topic, 1, 1)
 
-TestUtils.waitUntilTrue(() => {
-  this.zkClient.topicExists(topic)
-}, "Failed to create topic")
-

Review Comment:
   Why don't we need this now?



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1657,35 +1759,37 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(numMessages - records.count, 
lag.metricValue.asInstanceOf[Double], epsilon, s"The lag should be 
${numMessages - records.count}")
   }
 
-  @Test
-  def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testQuotaMetricsNotCreatedIfNoQuotasConfigured(groupProtocol: String): 
Unit = {
 val numRecords = 1000
 val producer = createProducer()
 val startingTimestamp = System.currentTimeMillis()
 sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 val consumer = createConsumer()
 consumer.assign(List(tp).asJava)
 consumer.seek(tp, 0)
 consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0, startingTimestamp = startingTimestamp)
 
-def assertNoMetric(broker: KafkaServer, name: String, quotaType: 
QuotaType, clientId: String): Unit = {
-val metricName = broker.metrics.metricName("throttle-time",
-  quotaType.toString,
-  "",
-  "user", "",
-  "client-id", clientId)
-assertNull(broker.metrics.metric(metricName), "Metric should not have 
been created " + metricName)
+def assertNoMetric(broker: KafkaBroker, name: String, quotaType: 
QuotaType, clientId: String): Unit = {
+  val metricName = broker.metrics.metricName("throttle-time",
+quotaType.toString,
+"",
+"user", "",
+"client-id", clientId)
+  assertNull(broker.metrics.metric(metricName), "Metric should not have 
been created " + metricName)
 }
-servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, 
producerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, 
producerClientId))
-servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, 
consumerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, 
consumerClientId))
 
-servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
producerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
producerClientId))
-servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
consumerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
consumerClientId))

Review Comment:
   So this is incorrect in `trunk`, right?



##

[PR] KAFKA-15827: Prevent KafkaBasedLog subclasses from leaking passed-in clients [kafka]

2023-11-14 Thread via GitHub


gharris1727 opened a new pull request, #14763:
URL: https://github.com/apache/kafka/pull/14763

   The KafkaBasedLog normally creates clients during start() and closes them in 
stop().
   Some KafkaBasedLog subclasses accept already-created clients, and close them 
in stop() if start() is called first.
   These clients should also be closed if stop() is called without first 
calling start().
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15827) KafkaBasedLog.withExistingClients leaks clients if start is not called

2023-11-14 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15827:
---

 Summary: KafkaBasedLog.withExistingClients leaks clients if start 
is not called
 Key: KAFKA-15827
 URL: https://issues.apache.org/jira/browse/KAFKA-15827
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.6.0
Reporter: Greg Harris
Assignee: Greg Harris


The KafkaBasedLog base implementation creates consumers and producers, and 
closes them after they are instantiated. There are subclasses of the 
KafkaBasedLog which accept pre-created consumers and producers, and have the 
responsibility for closing the clients when the KafkaBasedLog is stopped.

It appears that the KafkaBasedLog subclasses do not close the clients when 
start() is skipped and stop() is called directly. This happens in a few tests, 
and causes the passed-in clients to be leaked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-14 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1393218354


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   Thanks! I agree. I'm going to implement this in the `StoreChangelogReader` 
adding a new argument for the `#unregister ` method and cover the case for 
`MIGRATED`



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   Thanks! I agree. I'm going to implement this in the `StoreChangelogReader` 
adding a new argument for the `#unregister` method and cover the case for 
`MIGRATED`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-14 Thread via GitHub


agavra commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1393203579


##
streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java:
##
@@ -97,21 +128,21 @@ protected Materialized(final Materialized 
materialized) {
 this.cachingEnabled = materialized.cachingEnabled;
 this.topicConfig = materialized.topicConfig;
 this.retention = materialized.retention;
-this.storeType = materialized.storeType;
+this.storeSuppliers = materialized.storeSuppliers;
 }
 
 /**
- * Materialize a {@link StateStore} with the given {@link StoreType}.
+ * Materialize a {@link StateStore} with the given {@link 
DslStoreSuppliers}.
  *
- * @param storeType  the type of the state store
- * @paramkey type of the store
- * @paramvalue type of the store
- * @paramtype of the {@link StateStore}
+ * @param storeSuppliers  the type of the state store
+ * @param  key type of the store
+ * @param  value type of the store
+ * @param  type of the {@link StateStore}
  * @return a new {@link Materialized} instance with the given storeName
  */
-public static  Materialized as(final 
StoreType storeType) {
-Objects.requireNonNull(storeType, "store type can't be null");
-return new Materialized<>(storeType);
+public static  Materialized as(final 
DslStoreSuppliers storeSuppliers) {

Review Comment:
   yup!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-14 Thread via GitHub


agavra commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1393194762


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -536,6 +537,11 @@ public class StreamsConfig extends AbstractConfig {
 public static final String ROCKS_DB = "rocksDB";
 public static final String IN_MEMORY = "in_memory";
 
+/** {@code dsl.store.suppliers.class } */
+public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = 
"dsl.store.suppliers.class";
+public static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which 
store implementations to plug in to DSL operators. Must implement the 
org.apache.kafka.streams.state.DslStoreSuppliers interface.";

Review Comment:
   kept package private so it can be used in `TopologyConfig` as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]

2023-11-14 Thread via GitHub


vamossagar12 commented on PR #13283:
URL: https://github.com/apache/kafka/pull/13283#issuecomment-1811121235

   Hey @sean-rossignol , thanks for explaining your use case and how this can 
be useful for you! I hope to push it in the upcoming 3.7 release. I haven't 
gotten the chance to resolve the merge conflicts. Will try to do so this week 
and maybe you could help with the reviews?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane

2023-11-14 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786046#comment-17786046
 ] 

Proven Provenzano commented on KAFKA-15513:
---

The role of "{{{}kafka-storage --add-scram{}}}" is to allow brokers to use 
SCRAM to talk with each other without having to use another authentication 
scheme to bootstrap.

 

Using ZK the process of getting broker to broker authentication to use SCRAM 
involved setting up ZK first, then adding the SCRAM credentials directly to ZK, 
and then starting the brokers up.

In KRAFT you cannot add SCRAM credentials directly to controllers so to 
facilitate the bootstrap we allow metadata records to be added at Controller 
initialization. 

 

> KRaft cluster fails with SCRAM authentication enabled for control-plane
> ---
>
> Key: KAFKA-15513
> URL: https://issues.apache.org/jira/browse/KAFKA-15513
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.6.0, 3.5.1
>Reporter: migruiz4
>Priority: Major
>
> We have observed a scenario where a KRaft cluster fails to bootstrap when 
> using SCRAM authentication for controller-to-controller communications.
> The steps to reproduce are simple:
>  * Deploy (at least) 2 Kafka servers using latest version 3.5.1.
>  * Configure a KRaft cluster, where the controller listener uses 
> SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the 
> recommended in-line jaas config 
> '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}'
>  * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create 
> the SCRAM user.
> When initialized, Controllers will fail to connect to each other with an 
> authentication error:
>  
> {code:java}
> [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: 
> Failed to send the following request due to authentication error: 
> ClientRequest(expectResponse=true, 
> callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075,
>  destination=0, correlationId=129, clientId=raft-client-1, 
> createdTimeMs=1690888364960, 
> requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', 
> topics=[TopicData(topicName='__cluster_metadata', 
> partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, 
> lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code}
> Some additional details about the scenario that we tested out:
>  *  Controller listener does work when configured with SASL+PLAIN
>  * The issue only affects the Controller listener, SCRAM users created using 
> the same method work for data-plane listeners and inter-broker listeners.
>  
> Below you can find the exact configuration and command used to deploy:
>  * server.properties
> {code:java}
> listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093
> advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091
> listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/bitnami/kafka/data
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
> log.retention.hours=168
> log.retention.check.interval.ms=30
> controller.listener.names=CONTROLLER
> controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093
> inter.broker.listener.name=INTERNAL
> node.id=0
> process.roles=controller,broker
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
> sasl.mechanism.controller.protocol=SCRAM-SHA-512
> listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512
> listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
>  required username="controller_user" password="controller_password";{code}
>  * kafka-storage.sh command
> {code:java}
> kafka-storage.sh format --config /path/to/server.properties 
> --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram 
> SCRAM-SHA-512=[name=controller_user,password=controller_password] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15825) KRaft controller writes empty state to ZK after migration

2023-11-14 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15825:
-
Component/s: kraft

> KRaft controller writes empty state to ZK after migration
> -
>
> Key: KAFKA-15825
> URL: https://issues.apache.org/jira/browse/KAFKA-15825
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> Immediately following the ZK migration, there is a race condition where the 
> KRaftMigrationDriver can use an empty MetadataImage when performing the full 
> "SYNC_KRAFT_TO_ZK" reconciliation. 
> After the next controller failover, or when the controller loads a metadata 
> snapshot, the correct state will be written to ZK. 
> The symptom of this bug is that we see the migration complete, and then all 
> the metadata removed from ZK. For example, 
> {code}
> [KRaftMigrationDriver id=9990] Completed migration of metadata from ZooKeeper 
> to KRaft. 573 records were generated in 2204 ms across 51 batches. The record 
> types were {TOPIC_RECORD=41, PARTITION_RECORD=410, CONFIG_RECORD=121, 
> PRODUCER_IDS_RECORD=1}. The current metadata offset is now 503794 with an 
> epoch of 21. Saw 6 brokers in the migrated metadata [0, 1, 2, 3, 4, 5].
> {code}
> immediately followed by:
> {code}
> [KRaftMigrationDriver id=9990] Made the following ZK writes when reconciling 
> with KRaft state: {DeleteBrokerConfig=7, DeleteTopic=41, UpdateTopicConfig=41}
> {code}
> If affected by this, a quick workaround is to cause the controller to 
> failover.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15818) Implement max poll internval

2023-11-14 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15818:
---
Labels: consumer consumer-threading-refactor  (was: )

> Implement max poll internval
> 
>
> Key: KAFKA-15818
> URL: https://issues.apache.org/jira/browse/KAFKA-15818
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Priority: Blocker
>  Labels: consumer, consumer-threading-refactor
>
> The consumer needs to be polled at a candance lower than 
> MAX_POLL_INTERVAL_MAX otherwise the consumer should try to leave the group.  
> Currently, we send an acknowledgment event to the network thread per poll.  
> The event only triggers update on autocommit state, we need to implement 
> updating the poll timer so that the consumer can leave the group when the 
> timer expires. 
>  
> The current logic looks like this:
> {code:java}
>  if (heartbeat.pollTimeoutExpired(now)) {
> // the poll timeout has expired, which means that the foreground thread 
> has stalled
> // in between calls to poll().
> log.warn("consumer poll timeout has expired. This means the time between 
> subsequent calls to poll() " +
> "was longer than the configured max.poll.interval.ms, which typically 
> implies that " +
> "the poll loop is spending too much time processing messages. You can 
> address this " +
> "either by increasing max.poll.interval.ms or by reducing the maximum 
> size of batches " +
> "returned in poll() with max.poll.records.");
> maybeLeaveGroup("consumer poll timeout has expired.");
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15826: Close consumer when sink task is cancelled [kafka]

2023-11-14 Thread via GitHub


gharris1727 opened a new pull request, #14762:
URL: https://github.com/apache/kafka/pull/14762

   Currently the only place that the consumer is closed is on the task thread, 
which may be blocked indefinitely by the task plugin. Similar to 
AbstractWorkerSourceTask, we should close the consumer here to allow for 
cleanup to take place.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


jolshan commented on code in PR #14753:
URL: https://github.com/apache/kafka/pull/14753#discussion_r1393093702


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -898,20 +900,19 @@ class ReplicaManager(val config: KafkaConfig,
 }
 
 actionQueue.add {
-  () =>
-allResults.foreach { case (topicPartition, result) =>
-  val requestKey = TopicPartitionOperationKey(topicPartition)
-  result.info.leaderHwChange match {
-case LeaderHwChange.INCREASED =>
-  // some delayed operations may be unblocked after HW changed
-  delayedProducePurgatory.checkAndComplete(requestKey)
-  delayedFetchPurgatory.checkAndComplete(requestKey)
-  delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
-case LeaderHwChange.SAME =>
-  // probably unblock some follower fetch requests since log end 
offset has been updated
-  delayedFetchPurgatory.checkAndComplete(requestKey)
-case LeaderHwChange.NONE =>
-// nothing
+  () => allResults.foreach { case (topicPartition, result) =>
+val requestKey = TopicPartitionOperationKey(topicPartition)
+result.info.leaderHwChange match {
+  case LeaderHwChange.INCREASED =>
+// some delayed operations may be unblocked after HW changed
+delayedProducePurgatory.checkAndComplete(requestKey)
+delayedFetchPurgatory.checkAndComplete(requestKey)
+delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
+  case LeaderHwChange.SAME =>
+// probably unblock some follower fetch requests since log end 
offset has been updated
+delayedFetchPurgatory.checkAndComplete(requestKey)
+  case LeaderHwChange.NONE =>
+  // nothing

Review Comment:
   I think it was the same here, but I can move it. 
https://github.com/apache/kafka/commit/7d147cf2413e5d361422728e5c9306574658c78d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15816: Fix leaked sockets in mirror [kafka]

2023-11-14 Thread via GitHub


gharris1727 opened a new pull request, #14761:
URL: https://github.com/apache/kafka/pull/14761

   These socket leaks were caused by typos, either an extra call to start() or 
a missing call to close().
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15826) WorkerSinkTask leaks Consumer if plugin start or stop blocks indefinitely

2023-11-14 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15826:
---

 Summary: WorkerSinkTask leaks Consumer if plugin start or stop 
blocks indefinitely
 Key: KAFKA-15826
 URL: https://issues.apache.org/jira/browse/KAFKA-15826
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.6.0
Reporter: Greg Harris
Assignee: Greg Harris


The WorkerSourceTask cancel() method closes the Producer, releasing it's 
resources. The WorkerSInkTask does not do the same for the Consumer, as it does 
not override the cancel() method.

WorkerSinkTask should close the consumer if the task is cancelled, as progress 
for a cancelled task will be discarded anyway. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15825) KRaft controller writes empty state to ZK after migration

2023-11-14 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-15825.
--
Resolution: Fixed

This bug was fixed as part of KAFKA-15605

> KRaft controller writes empty state to ZK after migration
> -
>
> Key: KAFKA-15825
> URL: https://issues.apache.org/jira/browse/KAFKA-15825
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> Immediately following the ZK migration, there is a race condition where the 
> KRaftMigrationDriver can use an empty MetadataImage when performing the full 
> "SYNC_KRAFT_TO_ZK" reconciliation. 
> After the next controller failover, or when the controller loads a metadata 
> snapshot, the correct state will be written to ZK. 
> The symptom of this bug is that we see the migration complete, and then all 
> the metadata removed from ZK. For example, 
> {code}
> [KRaftMigrationDriver id=9990] Completed migration of metadata from ZooKeeper 
> to KRaft. 573 records were generated in 2204 ms across 51 batches. The record 
> types were {TOPIC_RECORD=41, PARTITION_RECORD=410, CONFIG_RECORD=121, 
> PRODUCER_IDS_RECORD=1}. The current metadata offset is now 503794 with an 
> epoch of 21. Saw 6 brokers in the migrated metadata [0, 1, 2, 3, 4, 5].
> {code}
> immediately followed by:
> {code}
> [KRaftMigrationDriver id=9990] Made the following ZK writes when reconciling 
> with KRaft state: {DeleteBrokerConfig=7, DeleteTopic=41, UpdateTopicConfig=41}
> {code}
> If affected by this, a quick workaround is to cause the controller to 
> failover.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15825) KRaft controller writes empty state to ZK after migration

2023-11-14 Thread David Arthur (Jira)
David Arthur created KAFKA-15825:


 Summary: KRaft controller writes empty state to ZK after migration
 Key: KAFKA-15825
 URL: https://issues.apache.org/jira/browse/KAFKA-15825
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 3.6.0
Reporter: David Arthur
Assignee: David Arthur
 Fix For: 3.7.0, 3.6.1


Immediately following the ZK migration, there is a race condition where the 
KRaftMigrationDriver can use an empty MetadataImage when performing the full 
"SYNC_KRAFT_TO_ZK" reconciliation. 

After the next controller failover, or when the controller loads a metadata 
snapshot, the correct state will be written to ZK. 

The symptom of this bug is that we see the migration complete, and then all the 
metadata removed from ZK. For example, 

{code}
[KRaftMigrationDriver id=9990] Completed migration of metadata from ZooKeeper 
to KRaft. 573 records were generated in 2204 ms across 51 batches. The record 
types were {TOPIC_RECORD=41, PARTITION_RECORD=410, CONFIG_RECORD=121, 
PRODUCER_IDS_RECORD=1}. The current metadata offset is now 503794 with an epoch 
of 21. Saw 6 brokers in the migrated metadata [0, 1, 2, 3, 4, 5].
{code}

immediately followed by:

{code}
[KRaftMigrationDriver id=9990] Made the following ZK writes when reconciling 
with KRaft state: {DeleteBrokerConfig=7, DeleteTopic=41, UpdateTopicConfig=41}
{code}

If affected by this, a quick workaround is to cause the controller to failover.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15605) Topics marked for deletion in ZK are incorrectly migrated to KRaft

2023-11-14 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-15605.
--
Fix Version/s: 3.7.0
   Resolution: Fixed

> Topics marked for deletion in ZK are incorrectly migrated to KRaft
> --
>
> Key: KAFKA-15605
> URL: https://issues.apache.org/jira/browse/KAFKA-15605
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> When migrating topics from ZooKeeper, the KRaft controller reads all the 
> topic and partition metadata from ZK directly. This includes topics which 
> have been marked for deletion by the ZK controller. After being migrated to 
> KRaft, the pending topic deletions are never completed, so it is as if the 
> delete topic request never happened.
> Since the client request to delete these topics has already been returned as 
> successful, it would be confusing to the client that the topic still existed. 
> An operator or application would need to issue another topic deletion to 
> remove these topics once the controller had moved to KRaft. If they tried to 
> create a new topic with the same name, they would receive a 
> TOPIC_ALREADY_EXISTS error.
> The migration logic should carry over pending topic deletions and resolve 
> them either as part of the migration or shortly after.
> *Note to operators:*
> To determine if a migration was affected by this, an operator can check the 
> contents of {{/admin/delete_topics}} after the KRaft controller has migrated 
> the metadata. If any topics are listed under this ZNode, they were not 
> deleted and will still be present in KRaft. At this point the operator can 
> make a determination if the topics should be re-deleted (using 
> "kafka-topics.sh --delete") or left in place. In either case, the topics 
> should be removed from {{/admin/delete_topics}} to prevent unexpected topic 
> deletion in the event of a fallback to ZK.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15823: disconnect from controller on AuthenticationException [kafka]

2023-11-14 Thread via GitHub


gaurav-narula opened a new pull request, #14760:
URL: https://github.com/apache/kafka/pull/14760

   This PR changes the handling of authenticationException on a request from 
the node to the controller.
   
   We disconnect controller connection and invalidate the cache so that the 
next run of the thread will establish a connection with the (potentially) 
updated controller.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-14 Thread via GitHub


junrao commented on PR #14632:
URL: https://github.com/apache/kafka/pull/14632#issuecomment-1810855040

   @apoorvmittal10 : Thanks for looking into the test failures. There is an 
ongoing discussion on requiring a green build before merging the PR. I will 
need to wait for the result of that discussion before merging the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-14 Thread via GitHub


philipnee commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1810853645

   Basically future.get() API only return 3 types of exceptions: 
ExecutionException, InterruptedException, and Cancellation per documentation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-14 Thread via GitHub


philipnee commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1810851585

   Hi @cadonna - When the consumer is woken up.  The WakeupTrigger should 
complete the future exceptionally with WakeupException.  To rethrow that 
exception during future.get(), you will need to examine the ExecutionException 
kind of like this: 
   ```
   catch (ExecutionException e) {
   Throwable t = e.getCause();
   
   if (t instanceof WakeupException)
   throw new WakeupException();
   else if (t instanceof KafkaException)
   throw (KafkaException) t;
   else
   throw new KafkaException(t);
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15802: Validate remote segment state before fetching index [kafka]

2023-11-14 Thread via GitHub


jeqo commented on PR #14759:
URL: https://github.com/apache/kafka/pull/14759#issuecomment-1810780948

   cc @satishd 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]

2023-11-14 Thread via GitHub


philipnee commented on PR #14680:
URL: https://github.com/apache/kafka/pull/14680#issuecomment-1810775229

   Hi @lucasbru - Yes. It is not quite feasible to use the backgroundEventQueue 
because of the behavior of the current API, i.e., the callback needs to be 
invoked ouside of the poll call.  If we use the backgroundEventQueue, then the 
operation can become quite complicated, i.e. we will need to scan through the 
queue, execute the callback, and remove the callback from the queue.  @kirktrue 
- mind comment here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


hachikuji commented on code in PR #14753:
URL: https://github.com/apache/kafka/pull/14753#discussion_r1392979091


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -898,20 +900,19 @@ class ReplicaManager(val config: KafkaConfig,
 }
 
 actionQueue.add {
-  () =>
-allResults.foreach { case (topicPartition, result) =>
-  val requestKey = TopicPartitionOperationKey(topicPartition)
-  result.info.leaderHwChange match {
-case LeaderHwChange.INCREASED =>
-  // some delayed operations may be unblocked after HW changed
-  delayedProducePurgatory.checkAndComplete(requestKey)
-  delayedFetchPurgatory.checkAndComplete(requestKey)
-  delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
-case LeaderHwChange.SAME =>
-  // probably unblock some follower fetch requests since log end 
offset has been updated
-  delayedFetchPurgatory.checkAndComplete(requestKey)
-case LeaderHwChange.NONE =>
-// nothing
+  () => allResults.foreach { case (topicPartition, result) =>
+val requestKey = TopicPartitionOperationKey(topicPartition)
+result.info.leaderHwChange match {
+  case LeaderHwChange.INCREASED =>
+// some delayed operations may be unblocked after HW changed
+delayedProducePurgatory.checkAndComplete(requestKey)
+delayedFetchPurgatory.checkAndComplete(requestKey)
+delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
+  case LeaderHwChange.SAME =>
+// probably unblock some follower fetch requests since log end 
offset has been updated
+delayedFetchPurgatory.checkAndComplete(requestKey)
+  case LeaderHwChange.NONE =>
+  // nothing

Review Comment:
   nit: looks misaligned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15802: Validate remote segment state before fetching index [kafka]

2023-11-14 Thread via GitHub


jeqo opened a new pull request, #14759:
URL: https://github.com/apache/kafka/pull/14759

   Cherrypick from https://github.com/apache/kafka/pull/14727 into trunk
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


jolshan commented on code in PR #14753:
URL: https://github.com/apache/kafka/pull/14753#discussion_r1392978203


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig,
   recordConversionStatsCallback,
   timeout,
   responseCallback,
-  delayedProduceLock
+  delayedProduceLock,
+  actionQueue

Review Comment:
   added the new name to the class value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2023-11-14 Thread via GitHub


philipnee commented on PR #14752:
URL: https://github.com/apache/kafka/pull/14752#issuecomment-1810767025

   Hi @kirktrue Thanks for taking time to clean up the task. I left some 
comments there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2023-11-14 Thread via GitHub


philipnee commented on code in PR #14752:
URL: https://github.com/apache/kafka/pull/14752#discussion_r1392968241


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java:
##
@@ -24,85 +25,133 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Ensures blocking APIs can be woken up by the consumer.wakeup().
+ * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}.
  */
-public class WakeupTrigger {
-private final AtomicReference pendingTask = new 
AtomicReference<>(null);
+class WakeupTrigger {
+
+private final static Object FAIL_NEXT_MARKER = new Object();
+private final AtomicReference activeTask = new 
AtomicReference<>(null);
 
 /**
- * Wakeup a pending task.  If there isn't any pending task, return a 
WakeupFuture, so that the subsequent call
- * would know wakeup was previously called.
- * 
- * If there are active tasks, complete it with WakeupException, then unset 
pending task (return null here.
- * If the current task has already been woken-up, do nothing.
+ * Wakeup a pending task.
+ *
+ * 
+ *
+ * There are three cases that can happen when this method is invoked:
+ *
+ * 
+ * 
+ * If there is no active task from a previous call to 
{@code setActiveTask}, we set an
+ * internal indicator that the next attempt to start a 
long-running task (via a call to
+ * {@link #setActiveTask(CompletableFuture)}) will fail with a 
{@link WakeupException}.
+ * 
+ * 
+ * If there is an active task (i.e. there was a previous 
call to
+ * {@link #setActiveTask(CompletableFuture)} for a long-running 
task), fail it via
+ * {@link CompletableFuture#completeExceptionally(Throwable)} and 
then clear the active task.
+ * 
+ * 
+ * If there is already an pending wakeup from a previous call to 
{@link Consumer#wakeup()}, do nothing.
+ * We keep the internal state as is so that the future calls to 
{@link #setActiveTask(CompletableFuture)}
+ * will fail as expected.
+ * 
+ * 
  */
-public void wakeup() {
-pendingTask.getAndUpdate(task -> {
-if (task == null) {
-return new WakeupFuture();
-} else if (task instanceof ActiveFuture) {
-ActiveFuture active = (ActiveFuture) task;
-active.future().completeExceptionally(new WakeupException());
+void wakeup() {
+activeTask.getAndUpdate(existingTask -> {
+if (existingTask == null) {
+// If there isn't an existing task, return our marker, so that 
the subsequent call will
+// know wakeup was previously called.
+return FAIL_NEXT_MARKER;
+} else if (existingTask instanceof CompletableFuture) {
+// If there is an existing "active" task, complete it with 
WakeupException.
+CompletableFuture active = (CompletableFuture) 
existingTask;
+active.completeExceptionally(new WakeupException());
+
+// We return a null here to effectively unset the "active" 
task.
 return null;
 } else {
-return task;
+// This is the case where the existing task is the wakeup 
marker. So the user has apparently
+// called Consumer.wakeup() more than once.
+return existingTask;
 }
 });
 }
 
 /**
- * If there is no pending task, set the pending task active.
- * If wakeup was called before setting an active task, the current 
task will complete exceptionally with
- * WakeupException right
- * away.
- * if there is an active task, throw exception.
- * @param currentTask
- * @param 
- * @return
+ * This method should be called before execution a blocking operation in 
the {@link Consumer}. This will
+ * store an internal reference to the given active {@link 
CompletableFuture task} that can be
+ * {@link CompletableFuture#completeExceptionally(Throwable) forcibly 
failed} if the user invokes the
+ * {@link Consumer#wakeup()} call before or during its execution.
+ *
+ * 
+ *
+ * There are three cases that can happen when this method is invoked:
+ *
+ * 
+ * 
+ * If there is no active task from a previous call to 
{@code setActiveTask} and no
+ * previous calls to {@link #wakeup()}, set the given {@link 
CompletableFuture} as the
+ * active task.
+ * 
+ * 
+ * If there was a previous call to {@link #wakeup()}, the given 
{@link CompletableFuture task} will fail
+ * via {@link CompletableFuture#completeExceptionally(Throwable)} 
and the active task will be

Review Comment:
   similarly - should we 

Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2023-11-14 Thread via GitHub


philipnee commented on code in PR #14752:
URL: https://github.com/apache/kafka/pull/14752#discussion_r1392964486


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java:
##
@@ -24,85 +25,133 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Ensures blocking APIs can be woken up by the consumer.wakeup().
+ * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}.
  */
-public class WakeupTrigger {
-private final AtomicReference pendingTask = new 
AtomicReference<>(null);
+class WakeupTrigger {
+
+private final static Object FAIL_NEXT_MARKER = new Object();
+private final AtomicReference activeTask = new 
AtomicReference<>(null);
 
 /**
- * Wakeup a pending task.  If there isn't any pending task, return a 
WakeupFuture, so that the subsequent call
- * would know wakeup was previously called.
- * 
- * If there are active tasks, complete it with WakeupException, then unset 
pending task (return null here.
- * If the current task has already been woken-up, do nothing.
+ * Wakeup a pending task.
+ *
+ * 
+ *
+ * There are three cases that can happen when this method is invoked:
+ *
+ * 
+ * 
+ * If there is no active task from a previous call to 
{@code setActiveTask}, we set an
+ * internal indicator that the next attempt to start a 
long-running task (via a call to
+ * {@link #setActiveTask(CompletableFuture)}) will fail with a 
{@link WakeupException}.
+ * 
+ * 
+ * If there is an active task (i.e. there was a previous 
call to
+ * {@link #setActiveTask(CompletableFuture)} for a long-running 
task), fail it via
+ * {@link CompletableFuture#completeExceptionally(Throwable)} and 
then clear the active task.

Review Comment:
   should we mention about WakeupException - something like active task 
compelted exceptionally via WakeupException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2023-11-14 Thread via GitHub


philipnee commented on code in PR #14752:
URL: https://github.com/apache/kafka/pull/14752#discussion_r1392962733


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java:
##
@@ -24,85 +25,133 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Ensures blocking APIs can be woken up by the consumer.wakeup().
+ * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}.
  */
-public class WakeupTrigger {
-private final AtomicReference pendingTask = new 
AtomicReference<>(null);
+class WakeupTrigger {
+
+private final static Object FAIL_NEXT_MARKER = new Object();
+private final AtomicReference activeTask = new 
AtomicReference<>(null);
 
 /**
- * Wakeup a pending task.  If there isn't any pending task, return a 
WakeupFuture, so that the subsequent call
- * would know wakeup was previously called.
- * 
- * If there are active tasks, complete it with WakeupException, then unset 
pending task (return null here.
- * If the current task has already been woken-up, do nothing.
+ * Wakeup a pending task.
+ *
+ * 
+ *
+ * There are three cases that can happen when this method is invoked:
+ *
+ * 
+ * 
+ * If there is no active task from a previous call to 
{@code setActiveTask}, we set an
+ * internal indicator that the next attempt to start a 
long-running task (via a call to
+ * {@link #setActiveTask(CompletableFuture)}) will fail with a 
{@link WakeupException}.
+ * 
+ * 
+ * If there is an active task (i.e. there was a previous 
call to
+ * {@link #setActiveTask(CompletableFuture)} for a long-running 
task), fail it via
+ * {@link CompletableFuture#completeExceptionally(Throwable)} and 
then clear the active task.
+ * 
+ * 
+ * If there is already an pending wakeup from a previous call to 
{@link Consumer#wakeup()}, do nothing.

Review Comment:
   "a pending"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392954694


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +347,537 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


jolshan commented on code in PR #14753:
URL: https://github.com/apache/kafka/pull/14753#discussion_r1392953096


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig,
   recordConversionStatsCallback,
   timeout,
   responseCallback,
-  delayedProduceLock
+  delayedProduceLock,
+  actionQueue

Review Comment:
   yeah. I think it could be useful to rename action queue so that the compiler 
catches such a change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


artemlivshits commented on code in PR #14753:
URL: https://github.com/apache/kafka/pull/14753#discussion_r1392946436


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig,
   recordConversionStatsCallback,
   timeout,
   responseCallback,
-  delayedProduceLock
+  delayedProduceLock,
+  actionQueue

Review Comment:
   Wow, this is so subtle, I stared at the change for some time to understand 
what it actually does; it's almost impossible to spot in the code review and 
the compiler cannot help either, what can we do to catch these issues in the 
future?  I think we should rename the member variable to be something like 
`defaultActionQueue` so that if we don't pass the `actionQueue` the compiler 
would catch it.
   
   Another question (probably to @dajac) -- is passing the `actionQueue` in the 
argument just an optimization or a correctness issue?  If it's just an 
optimization, I wonder what would be the effects of removing it and reduce the 
overall system complexity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-14 Thread via GitHub


soarez commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1810719729

   > avoids calling exit(1) in junit tests, which will kill Jenkins dead (even 
after 3 decades of Java, we don't have the technology to intercept exit() in 
unit testrs >:( )
   
   @cmccabe I'm confused, I thought that was the whole point of 
`kafka.utils.Exit`
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/Exit.scala#L21-L24
   
https://github.com/apache/kafka/blob/00e5803cd3af89011254e734232308618403309d/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java
   
   The `FaultHandler` concept in `QuorumController` seems great, but I'm not 
sure how it fits here: afaict the fault is always fatal and we can intercept it 
in tests - what am I missing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


AndrewJSchofield opened a new pull request, #14758:
URL: https://github.com/apache/kafka/pull/14758

   This PR parameterizes the consumer integration tests so they can be run 
against
   the existing "generic" group protocol and the new "consumer" group protocol
   introduced in KIP-848.
   
   The KIP-848 client code is under construction so some of the tests do not 
run on
   both variants to start with, but the idea is that the tests can be enabled 
as the gaps
   in functionality are closed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet

2023-11-14 Thread Mayank Shekhar Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayank Shekhar Narula updated KAFKA-15824:
--
Reviewer:   (was: Walker Carlson)

> SubscriptionState's maybeValidatePositionForCurrentLeader should handle 
> partition which isn't subscribed yet
> 
>
> Key: KAFKA-15824
> URL: https://issues.apache.org/jira/browse/KAFKA-15824
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Mayank Shekhar Narula
>Assignee: Mayank Shekhar Narula
>Priority: Major
> Fix For: 3.7.0
>
>
> As can be seen [here|#L453], maybeValidatePositionForCurrentLeader doesn't 
> check if partition is subscribed by checking TopicPartitionState cached is 
> null or not, as done by 
> [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
>  So it throws IllegalStateException for a partition that is yet not 
> subscribed.
> Lack of this check writing thread-safe code w.r.t SubscriptionState class is 
> awkward. This can be seen from the example code below. For example, at line 1 
> partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could 
> be removed from subscribed partitions(in a separate thread). So this forces 
> the user of this class to handle IllegalStateException which is awkward.
> {code:java}
> // Following is example code for the user of 
> SubscriptionState::maybeValidatePositionForCurrentLeader
> Set allCurrentlySubscribedTopics = 
> subscriptionState.assignedPartitions(); // line 1
> if(allCurrentlySubscribedTopics.contains(tp)) {
>      ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
> metadata.currentLeader(tp);
>   try() {
> subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
> leaderAndEpoch); // line 2
>   } catch (IllegalStateException e) {
>// recover from it. // line 3
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet

2023-11-14 Thread Mayank Shekhar Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayank Shekhar Narula updated KAFKA-15824:
--
Description: 
As can be seen [here|#L453], maybeValidatePositionForCurrentLeader doesn't 
check if partition is subscribed by checking TopicPartitionState cached is null 
or not, as done by 
[maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
 So it throws IllegalStateException for a partition that is yet not subscribed.

Lack of this check writing thread-safe code w.r.t SubscriptionState class is 
awkward. This can be seen from the example code below. For example, at line 1 
partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be 
removed from subscribed partitions(in a separate thread). So this forces the 
user of this class to handle IllegalStateException which is awkward.
{code:java}
// Following is example code for the user of 
SubscriptionState::maybeValidatePositionForCurrentLeader

Set allCurrentlySubscribedTopics = 
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
     ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
  try() {
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
leaderAndEpoch); // line 2
  } catch (IllegalStateException e) {
   // recover from it. // line 3
  }
}{code}
 

  was:
As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't 
check if partition is subscribed by checking TopicPartitionState cached is null 
or not, as done by 
[maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
 So it throws IllegalStateException for a partition that is yet not subscribed.

Lack of this check writing thread-safe code w.r.t SubscriptionState class is 
awkward. This can be seen from the example code below. For example, at line 1 
partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be 
removed from subscribed partitions(in a separate thread). So this forces the 
user of this class to handle IllegalStateException which is awkward.
{code:java}
// Following is example code for the user of 
SubscriptionState::maybeValidatePositionForCurrentLeader

Set allCurrentlySubscribedTopics = 
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
     ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
  try() {
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
leaderAndEpoch); // line 2
  } catch (IllegalStateException e) {
   // recover from it. // line 3
  }
}{code}
 


> SubscriptionState's maybeValidatePositionForCurrentLeader should handle 
> partition which isn't subscribed yet
> 
>
> Key: KAFKA-15824
> URL: https://issues.apache.org/jira/browse/KAFKA-15824
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Mayank Shekhar Narula
>Assignee: Mayank Shekhar Narula
>Priority: Major
> Fix For: 3.7.0
>
>
> As can be seen [here|#L453], maybeValidatePositionForCurrentLeader doesn't 
> check if partition is subscribed by checking TopicPartitionState cached is 
> null or not, as done by 
> [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
>  So it throws IllegalStateException for a partition that is yet not 
> subscribed.
> Lack of this check writing thread-safe code w.r.t SubscriptionState class is 
> awkward. This can be seen from the example code below. For example, at line 1 
> partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could 
> be removed from subscribed partitions(in a separate thread). So this forces 
> the user of this class to handle IllegalStateException which is awkward.
> {code:java}
> // Following is example code for the user of 
> SubscriptionState::maybeValidatePositionForCurrentLeader
> Set allCurrentlySubscribedTopics = 
> subscriptionState.assignedPartitions(); // line 1
> if(allCurrentlySubscribedTopics.contains(tp)) {
>      ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
> metadata.currentLeader(tp);
>   try() {
> subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
> leaderAndEpoch); // line 2
>   } catch (IllegalStateException 

[PR] Kip 951 SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet [kafka]

2023-11-14 Thread via GitHub


msn-tldr opened a new pull request, #14757:
URL: https://github.com/apache/kafka/pull/14757

   See the motivation in jira description 
https://issues.apache.org/jira/browse/KAFKA-15824
   
   Noticed this was discovered as `ReassignReplicaShrinkTest` started to fail 
with KIP-951 changes. KIP-951 changes since then have been reverted([PR](
   https://github.com/apache/kafka/pull/1)), would be put back on this is 
in.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet

2023-11-14 Thread Mayank Shekhar Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayank Shekhar Narula updated KAFKA-15824:
--
Description: 
As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't 
check if partition is subscribed by checking TopicPartitionState cached is null 
or not, as done by 
[maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
 So it throws IllegalStateException for a partition that is yet not subscribed.

Lack of this check writing thread-safe code w.r.t SubscriptionState class is 
awkward. This can be seen from the example code below. For example, at line 1 
partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be 
removed from subscribed partitions(in a separate thread). So this forces the 
user of this class to handle IllegalStateException which is awkward.
{code:java}
// Following is example code for the user of 
SubscriptionState::maybeValidatePositionForCurrentLeader

Set allCurrentlySubscribedTopics = 
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
     ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
  try() {
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
leaderAndEpoch); // line 2
  } catch (IllegalStateException e) {
   // recover from it. // line 3
  }
}{code}
 

  was:
As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't 
check if partition is subscribed by checking TopicPartitionState cached is null 
or not, as done by 
[maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
 So it throws IllegalStateException for a partition that is yet not subscribed.

Lack of this check prevents writing thread-safe code w.r.t SubscriptionState 
class, this can be seen from the example code below. For example, at line 1 
partA would be in allCurrentlySubscribedTopics, but at line 2 it could be 
removed from subscribed partitions.
{code:java}
// Following is example code for the user of 
SubscriptionState::maybeValidatePositionForCurrentLeader

Set allCurrentlySubscribedTopics = 
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
     ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
  try() {
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
leaderAndEpoch); // line 2
  } catch (IllegatonStateException e) {
   // recover from it. // line 3
  }
}{code}
 


> SubscriptionState's maybeValidatePositionForCurrentLeader should handle 
> partition which isn't subscribed yet
> 
>
> Key: KAFKA-15824
> URL: https://issues.apache.org/jira/browse/KAFKA-15824
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Mayank Shekhar Narula
>Assignee: Mayank Shekhar Narula
>Priority: Major
> Fix For: 3.7.0
>
>
> As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't 
> check if partition is subscribed by checking TopicPartitionState cached is 
> null or not, as done by 
> [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
>  So it throws IllegalStateException for a partition that is yet not 
> subscribed.
> Lack of this check writing thread-safe code w.r.t SubscriptionState class is 
> awkward. This can be seen from the example code below. For example, at line 1 
> partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could 
> be removed from subscribed partitions(in a separate thread). So this forces 
> the user of this class to handle IllegalStateException which is awkward.
> {code:java}
> // Following is example code for the user of 
> SubscriptionState::maybeValidatePositionForCurrentLeader
> Set allCurrentlySubscribedTopics = 
> subscriptionState.assignedPartitions(); // line 1
> if(allCurrentlySubscribedTopics.contains(tp)) {
>      ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
> metadata.currentLeader(tp);
>   try() {
> subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
> leaderAndEpoch); // line 2
>   } catch (IllegalStateException e) {
>// recover from it. // line 3
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira

[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet

2023-11-14 Thread Mayank Shekhar Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayank Shekhar Narula updated KAFKA-15824:
--
Description: 
As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't 
check if partition is subscribed by checking TopicPartitionState cached is null 
or not, as done by 
[maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
 So it throws IllegalStateException for a partition that is yet not subscribed.

Lack of this check prevents writing thread-safe code w.r.t SubscriptionState 
class, this can be seen from the example code below. For example, at line 1 
partA would be in allCurrentlySubscribedTopics, but at line 2 it could be 
removed from subscribed partitions.
{code:java}
// Following is example code for the user of 
SubscriptionState::maybeValidatePositionForCurrentLeader

Set allCurrentlySubscribedTopics = 
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
     ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
  try() {
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
leaderAndEpoch); // line 2
  } catch (IllegatonStateException e) {
   // recover from it. // line 3
  }
}{code}
 

  was:As can be seen 
[here|[https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L453]],
 it doesn't check if partition is subscribed by checking TopicPartitionState 
cached is null or not, as done here by maybe 


> SubscriptionState's maybeValidatePositionForCurrentLeader should handle 
> partition which isn't subscribed yet
> 
>
> Key: KAFKA-15824
> URL: https://issues.apache.org/jira/browse/KAFKA-15824
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Mayank Shekhar Narula
>Assignee: Mayank Shekhar Narula
>Priority: Major
> Fix For: 3.7.0
>
>
> As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't 
> check if partition is subscribed by checking TopicPartitionState cached is 
> null or not, as done by 
> [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
>  So it throws IllegalStateException for a partition that is yet not 
> subscribed.
> Lack of this check prevents writing thread-safe code w.r.t SubscriptionState 
> class, this can be seen from the example code below. For example, at line 1 
> partA would be in allCurrentlySubscribedTopics, but at line 2 it could be 
> removed from subscribed partitions.
> {code:java}
> // Following is example code for the user of 
> SubscriptionState::maybeValidatePositionForCurrentLeader
> Set allCurrentlySubscribedTopics = 
> subscriptionState.assignedPartitions(); // line 1
> if(allCurrentlySubscribedTopics.contains(tp)) {
>      ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
> metadata.currentLeader(tp);
>   try() {
> subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, 
> leaderAndEpoch); // line 2
>   } catch (IllegatonStateException e) {
>// recover from it. // line 3
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.

2023-11-14 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785956#comment-17785956
 ] 

Kamal Chandraprakash edited comment on KAFKA-15376 at 11/14/23 4:29 PM:


[~divijvaidya] 

The [example|https://github.com/apache/kafka/pull/13561#discussion_r1293286722] 
provided in the discussion is misleading. Let's divide the example into two to 
navigate it easier:

Assume that there are two replicas Broker A and Broker B for partition tp0:

*Case-1*
Both the replicas A and B are insync on startup and they hold the leader-epoch 
0. Then, the brokers started to go down in ping-pong fashion. Each broker will 
hold the following epoch in it's leader-epoch-checkpoint file:

A: 0, 2, 4, 6, 8
B: 0, 1, 3, 5, 7

Since this is unclean-leader-election, the logs of Broker A and B might be 
diverged. As long as anyone of them is online, they continue to serve all the 
records according to the leader-epoch-checkpoint file. Once both the brokers 
becomes online, the follower truncates itself up-to the largest common log 
prefix offset so that the logs won't be diverged between the leader and 
follower. In this case, we continue to serve the data from the remote storage 
as no segments will be removed due to leader-epoch-cache truncation since both 
of them holds the LE0.

Note that the approach taken here is similar to local-log where the broker will 
serve the log that they have until they sync with each other.

*Case-2*
Both the replicas A and B are out-of-sync on startup and the follower doesn't 
hold leader-epoch 0. Assume that Broker A is the leader and B is the follower & 
doesn't hold any data about the partition (empty-disk). When the Broker A goes 
down, there will be offline partition and B will be elected as unclean leader, 
the log-end-offset of the partition will be reset back to 0.

>From the example provided in the discussion:

At T1, Broker A
{code:java}
-
leader-epoch | start-offset |
-
 0  0
 1  180
 2  400
- {code}
At T2, Broker B, the start-offset will be reset back to 0: (Note that the 
leader does not interact with remote storage to find the next offset trade-off 
b/w availability and durabilty)
{code:java}
-
leader-epoch | start-offset |
-
 3  0
 4  780
 6  900
 7  990 
- {code}
Now, if we hold the data for both the lineage and ping-pong the brokers, we 
will be serving the diverged data back to the client for the same fetch-offset 
depends on the broker which is online. Once, the replicas start to interact 
with each other, they truncate the remote data themselves based on the current 
leader epoch lineage.

The example provided in the discussion is applicable only for case-2 where the 
replicas never interacted among themselves at-least once. 


was (Author: ckamal):
[~divijvaidya] 

The [example|https://github.com/apache/kafka/pull/13561#discussion_r1293286722] 
provided in the discussion is misleading. Let's divide the example into two to 
navigate it easier:


Assume that there are two replicas Broker A and Broker B for partition tp0:

*Case-1*
Both the replicas A and B are insync on startup and they hold the leader-epoch 
0. Then, the brokers started to go down in ping-pong fashion. Each broker will 
hold the following epoch in it's leader-epoch-checkpoint file:

A: 0, 2, 4, 6, 8
B: 0, 1, 3, 5, 7

Since this is unclean-leader-election, the logs of Broker A and B might be 
diverged. As long as anyone of them is online, they continue to serve all the 
records according to the leader-epoch-checkpoint file. Once both the brokers 
becomes online, the follower truncates itself up-to the largest common log 
prefix offset so that the logs won't be diverged between the leader and 
follower. In this case, we continue to serve the data from the remote storage 
as no segments will be removed due to leader-epoch-cache truncation since both 
of them holds the LE0. 

Note that the approach taken here is similar to local-log where the broker will 
serve the log that they have until they sync with each other.

*Case-2*
Both the replicas A and B are out-of-sync on startup and the follower doesn't 
hold leader-epoch 0. Assume that Broker A is the leader and B is the follower & 
doesn't hold any data about the partition (empty-disk). When the Broker A goes 
down, there will be offline partition and B will be elected as unclean leader, 
the log-end-offset of the partition will be reset back to 0.

>From the example provided in the discussion:

At T1, Broker A

{code:java}
-
leader-epoch | start-offset |
-

Re: [PR] KAFKA-15802: validate remote segment state before fetching index [kafka]

2023-11-14 Thread via GitHub


satishd merged PR #14727:
URL: https://github.com/apache/kafka/pull/14727


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.

2023-11-14 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785962#comment-17785962
 ] 

Kamal Chandraprakash commented on KAFKA-15376:
--

With unclean-leader-election enabled, there can be log-divergence, log-loss, 
and exactly-once-delivery is not applicable. We are trying to extend the same 
contract that is for local storage to remote when this feature is enabled. 
There are pros and cons to this feature:

*Pros*

1. The replica will serve the data that it seen so far back to the client even 
if it never interact with any other replica.

*Cons*

1. RemoteStorageManager / RemoteLogManager will have additional work to 
maintain the unreferenced segments and cleaning up them.

> Explore options of removing data earlier to the current leader's leader epoch 
> lineage for topics enabled with tiered storage.
> -
>
> Key: KAFKA-15376
> URL: https://issues.apache.org/jira/browse/KAFKA-15376
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.7.0
>
>
> Followup on the discussion thread:
> [https://github.com/apache/kafka/pull/13561#discussion_r1288778006]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane

2023-11-14 Thread Christian Lefebvre (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785958#comment-17785958
 ] 

Christian Lefebvre commented on KAFKA-15513:


Hi [~pprovenzano] ,

  isn't it the role of "{{{}kafka-storage --add-scram{}}}" command to create 
users than may be locally recognized by controller ?

> KRaft cluster fails with SCRAM authentication enabled for control-plane
> ---
>
> Key: KAFKA-15513
> URL: https://issues.apache.org/jira/browse/KAFKA-15513
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.6.0, 3.5.1
>Reporter: migruiz4
>Priority: Major
>
> We have observed a scenario where a KRaft cluster fails to bootstrap when 
> using SCRAM authentication for controller-to-controller communications.
> The steps to reproduce are simple:
>  * Deploy (at least) 2 Kafka servers using latest version 3.5.1.
>  * Configure a KRaft cluster, where the controller listener uses 
> SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the 
> recommended in-line jaas config 
> '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}'
>  * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create 
> the SCRAM user.
> When initialized, Controllers will fail to connect to each other with an 
> authentication error:
>  
> {code:java}
> [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: 
> Failed to send the following request due to authentication error: 
> ClientRequest(expectResponse=true, 
> callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075,
>  destination=0, correlationId=129, clientId=raft-client-1, 
> createdTimeMs=1690888364960, 
> requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', 
> topics=[TopicData(topicName='__cluster_metadata', 
> partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, 
> lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code}
> Some additional details about the scenario that we tested out:
>  *  Controller listener does work when configured with SASL+PLAIN
>  * The issue only affects the Controller listener, SCRAM users created using 
> the same method work for data-plane listeners and inter-broker listeners.
>  
> Below you can find the exact configuration and command used to deploy:
>  * server.properties
> {code:java}
> listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093
> advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091
> listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/bitnami/kafka/data
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
> log.retention.hours=168
> log.retention.check.interval.ms=30
> controller.listener.names=CONTROLLER
> controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093
> inter.broker.listener.name=INTERNAL
> node.id=0
> process.roles=controller,broker
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
> sasl.mechanism.controller.protocol=SCRAM-SHA-512
> listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512
> listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
>  required username="controller_user" password="controller_password";{code}
>  * kafka-storage.sh command
> {code:java}
> kafka-storage.sh format --config /path/to/server.properties 
> --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram 
> SCRAM-SHA-512=[name=controller_user,password=controller_password] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet

2023-11-14 Thread Mayank Shekhar Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayank Shekhar Narula updated KAFKA-15824:
--
Description: As can be seen 
[here|[https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L453]],
 it doesn't check if partition is subscribed by checking TopicPartitionState 
cached is null or not, as done here by maybe   (was: Right now in java-client, 
producer-batches backoff upto retry.backoff.ms(100ms by default). This Jira 
proposes that backoff should be skipped if client knows of a newer-leader for 
the partition in a sub-sequent retry(typically through refresh of 
parition-metadata via the Metadata RPC). This would help improve the latency of 
the produce-request around when partition leadership changes.)

> SubscriptionState's maybeValidatePositionForCurrentLeader should handle 
> partition which isn't subscribed yet
> 
>
> Key: KAFKA-15824
> URL: https://issues.apache.org/jira/browse/KAFKA-15824
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Mayank Shekhar Narula
>Assignee: Mayank Shekhar Narula
>Priority: Major
> Fix For: 3.7.0
>
>
> As can be seen 
> [here|[https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L453]],
>  it doesn't check if partition is subscribed by checking TopicPartitionState 
> cached is null or not, as done here by maybe 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15802: validate remote segment state before fetching index [kafka]

2023-11-14 Thread via GitHub


satishd commented on PR #14727:
URL: https://github.com/apache/kafka/pull/14727#issuecomment-1810604177

   There are a few unrelated test failures, merging the changes to trunk and 
3.6 branches. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet

2023-11-14 Thread Mayank Shekhar Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayank Shekhar Narula updated KAFKA-15824:
--
Fix Version/s: (was: 3.6.1)

> SubscriptionState's maybeValidatePositionForCurrentLeader should handle 
> partition which isn't subscribed yet
> 
>
> Key: KAFKA-15824
> URL: https://issues.apache.org/jira/browse/KAFKA-15824
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Mayank Shekhar Narula
>Assignee: Mayank Shekhar Narula
>Priority: Major
> Fix For: 3.7.0
>
>
> Right now in java-client, producer-batches backoff upto 
> retry.backoff.ms(100ms by default). This Jira proposes that backoff should be 
> skipped if client knows of a newer-leader for the partition in a sub-sequent 
> retry(typically through refresh of parition-metadata via the Metadata RPC). 
> This would help improve the latency of the produce-request around when 
> partition leadership changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet

2023-11-14 Thread Mayank Shekhar Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayank Shekhar Narula updated KAFKA-15824:
--
Issue Type: Bug  (was: Improvement)

> SubscriptionState's maybeValidatePositionForCurrentLeader should handle 
> partition which isn't subscribed yet
> 
>
> Key: KAFKA-15824
> URL: https://issues.apache.org/jira/browse/KAFKA-15824
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Mayank Shekhar Narula
>Assignee: Mayank Shekhar Narula
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> Right now in java-client, producer-batches backoff upto 
> retry.backoff.ms(100ms by default). This Jira proposes that backoff should be 
> skipped if client knows of a newer-leader for the partition in a sub-sequent 
> retry(typically through refresh of parition-metadata via the Metadata RPC). 
> This would help improve the latency of the produce-request around when 
> partition leadership changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet

2023-11-14 Thread Mayank Shekhar Narula (Jira)
Mayank Shekhar Narula created KAFKA-15824:
-

 Summary: SubscriptionState's maybeValidatePositionForCurrentLeader 
should handle partition which isn't subscribed yet
 Key: KAFKA-15824
 URL: https://issues.apache.org/jira/browse/KAFKA-15824
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Mayank Shekhar Narula
Assignee: Mayank Shekhar Narula
 Fix For: 3.7.0, 3.6.1


Right now in java-client, producer-batches backoff upto retry.backoff.ms(100ms 
by default). This Jira proposes that backoff should be skipped if client knows 
of a newer-leader for the partition in a sub-sequent retry(typically through 
refresh of parition-metadata via the Metadata RPC). This would help improve the 
latency of the produce-request around when partition leadership changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.

2023-11-14 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785956#comment-17785956
 ] 

Kamal Chandraprakash commented on KAFKA-15376:
--

[~divijvaidya] 

The [example|https://github.com/apache/kafka/pull/13561#discussion_r1293286722] 
provided in the discussion is misleading. Let's divide the example into two to 
navigate it easier:


Assume that there are two replicas Broker A and Broker B for partition tp0:

*Case-1*
Both the replicas A and B are insync on startup and they hold the leader-epoch 
0. Then, the brokers started to go down in ping-pong fashion. Each broker will 
hold the following epoch in it's leader-epoch-checkpoint file:

A: 0, 2, 4, 6, 8
B: 0, 1, 3, 5, 7

Since this is unclean-leader-election, the logs of Broker A and B might be 
diverged. As long as anyone of them is online, they continue to serve all the 
records according to the leader-epoch-checkpoint file. Once both the brokers 
becomes online, the follower truncates itself up-to the largest common log 
prefix offset so that the logs won't be diverged between the leader and 
follower. In this case, we continue to serve the data from the remote storage 
as no segments will be removed due to leader-epoch-cache truncation since both 
of them holds the LE0. 

Note that the approach taken here is similar to local-log where the broker will 
serve the log that they have until they sync with each other.

*Case-2*
Both the replicas A and B are out-of-sync on startup and the follower doesn't 
hold leader-epoch 0. Assume that Broker A is the leader and B is the follower & 
doesn't hold any data about the partition (empty-disk). When the Broker A goes 
down, there will be offline partition and B will be elected as unclean leader, 
the log-end-offset of the partition will be reset back to 0.

>From the example provided in the discussion:

At T1, Broker A

{code:java}
-
leader-epoch | start-offset |
-
 0  0
 1  180
 2  400
- {code}

At T2, Broker B, the start-offset will be reset back to 0: (Note that the 
leader does not interact with remote storage to find the next offset trade-off 
b/w availability and durabilty)
{code:java}
-
leader-epoch | start-offset |
-
 3  0
 4  780
 6  900
 7  990 
- {code}

Now, if we hold the data for both the lineage and ping-pong the brokers, we 
will be serving the diverged data back to the client for the same fetch-offset 
depends on the broker which is online. Once, the replicas start to interact 
with each other, they truncate the remote data themselves based on the current 
leader epoch lineage.

The example provided in the discussion is applicable only when the replicas 
never interacted among themselves at-least once. 

> Explore options of removing data earlier to the current leader's leader epoch 
> lineage for topics enabled with tiered storage.
> -
>
> Key: KAFKA-15376
> URL: https://issues.apache.org/jira/browse/KAFKA-15376
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.7.0
>
>
> Followup on the discussion thread:
> [https://github.com/apache/kafka/pull/13561#discussion_r1288778006]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-14 Thread via GitHub


coltmcnealy-lh commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1392849939


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -1012,6 +1022,8 @@ private void prepareChangelogs(final Map 
tasks,
 // no records to restore; in this case we just initialize the 
sensor to zero
 final long recordsToRestore = 
Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
 task.recordRestoration(time, recordsToRestore, true);
+}  else if (changelogMetadata.stateManager.taskType() == 
TaskType.STANDBY && storeMetadata.endOffset() != null) {

Review Comment:
   I think the `endOffset == null` could happen when the consumer hasn't yet 
made a `poll()` for a certain partition. Which means that I think it will be 
null in most cases, honestly.
   
   So this is actually a bit of a dilemma. Since in most cases we won't know 
the end offset until we have made our first call to `poll()` (and then 
`onBatchLoaded()`), the way the code is currently written makes me think we 
will almost never have a call to `onUpdateStart()`, which kind of defeats the 
purpose of the `onUpdateStart()` callback.
   
   I see two options here:
   1.  Pass in some sentinel value when we don't know the `endOffset` upon 
initialization of the Standby Task. Sentinel value could be `-1` or `null` or 
`Optional.Empty`...apparently my team thinks I am a really crappy programmer, 
so I don't have the right to opine on which one :joy: 
   2. Remove the `endOffset` parameter from the `onUpdateStart()` method 
signature. This might require changing the KIP but I don't think it would take 
a vote.
   
   Personally, I prefer option 2). In the most common case, we won't have the 
end offset, so I wouldn't want to "mislead" someone reading the javadoc into 
thinking that they might get some info that we probably don't have.
   
   And the `onUpdateStart()` in practice is normally followed by 
`onBatchLoaded()` just a few hundred milliseconds later. The `onBatchLoaded()` 
will contain the `endOffset`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392830639


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -77,32 +138,110 @@ public class MembershipManagerImpl implements 
MembershipManager {
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
-private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+private Set currentAssignment;
 
 /**
- * Assignment that the member received from the server but hasn't 
completely processed
- * yet.
+ * Subscription state object holding the current assignment the member has 
for the topics it
+ * subscribed to.
  */
-private Optional 
targetAssignment;
+private final SubscriptionState subscriptions;
+
+/**
+ * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
+ */
+private final ConsumerMetadata metadata;
+
+/**
+ * TopicPartition comparator based on topic name and partition id.
+ */
+private final static TopicPartitionComparator COMPARATOR = new 
TopicPartitionComparator();
 
 /**
  * Logger.
  */
 private final Logger log;
 
-public MembershipManagerImpl(String groupId, LogContext logContext) {
-this(groupId, null, null, logContext);
+/**
+ * Manager to perform commit requests needed before revoking partitions 
(if auto-commit is
+ * enabled)
+ */
+private final CommitRequestManager commitRequestManager;
+
+/**
+ * Local cache of assigned topic IDs and names. Topics are added here when 
received in a
+ * target assignment, as we discover their names in the Metadata cache, 
and removed when the
+ * topic is not in the subscription anymore. The purpose of this cache is 
to avoid metadata
+ * requests in cases where a currently assigned topic is in the target 
assignment (new
+ * partition assigned, or revoked), but it is not present the Metadata 
cache at that moment.
+ * The cache is cleared when the subscription changes ({@link 
#transitionToJoining()}, the
+ * member fails ({@link #transitionToFatal()} or leaves the group ({@link 
#leaveGroup()}).
+ */
+private final Map assignedTopicNamesCache;
+
+/**
+ * Topic IDs received in a target assignment for which we haven't found 
topic names yet.
+ * Items are added to this set every time a target assignment is received. 
Items are removed
+ * when metadata is found for the topic. This is where the member collects 
all assignments
+ * received from the broker, even though they may not be ready to 
reconcile due to missing
+ * metadata.
+ */
+private final Map> assignmentUnresolved;
+
+/**
+ * Assignment received for which topic names have been resolved, so it's 
ready to be
+ * reconciled. Items are added to this set when received in a target 
assignment (if metadata
+ * available), or when a metadata update is received. This is where the 
member keeps all the
+ * assignment ready to reconcile, even though the reconciliation might 
need to wait if there
+ * is already another on in process.
+ */
+private final SortedSet assignmentReadyToReconcile;
+
+/**
+ * Epoch that a member must include a heartbeat request to indicate that 
it want to join or
+ * re-join a group.
+ */
+public static final int JOIN_GROUP_EPOCH = 0;
+
+/**
+ * If there is a reconciliation running (triggering commit, callbacks) for 
the
+ * assignmentReadyToReconcile. This will be true if {@link #reconcile()} 
has been triggered
+ * after receiving a heartbeat response, or a metadata update.
+ */
+private boolean reconciliationInProgress;
+
+/**
+ * ID the member had when the reconciliation in progress started. This is 
used to identify if
+ * the member has rejoined while it was reconciling an assignment (in 
which case the result
+ * of the reconciliation is not applied.)
+ */
+private String memberIdOnReconciliationStart;
+
+

Review Comment:
   Those kinds of things tend to jump out in _other people's_ code, but I 
frequently miss them in my own  



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -77,32 +138,110 @@ public class MembershipManagerImpl implements 
MembershipManager {
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
-private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+private Set currentAssignment;
 
 /**
- * Assignment that the member received from the server but hasn't 
completely processed
- * yet.
+ * Subscription state object holding the current assignment the member has 
for the topics it
+ * subscribed to.
  */
-

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392829031


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##
@@ -25,7 +25,8 @@ public abstract class ApplicationEvent {
 
 public enum Type {
 COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA
+LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIPTION_CHANGE,

Review Comment:
   I never liked `ASSIGNMENT_CHANGE` either, but I guess it's consistent, so  



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##
@@ -25,7 +25,8 @@ public abstract class ApplicationEvent {
 
 public enum Type {
 COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA
+LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIPTION_CHANGE,

Review Comment:
   I never liked `ASSIGNMENT_CHANGE` either, but I guess it's consistent, so  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392827512


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+/**
+ * Application event triggered when a user calls the unsubscribe API. This 
will make the consumer
+ * release all its assignments and send a heartbeat request to leave the 
consumer group.
+ * This event holds a future that will complete when the invocation of 
callbacks to release
+ * complete and the heartbeat to leave the group is sent out (minimal effort 
to send the
+ * leave group heartbeat, without waiting for any response or considering 
timeouts).
+ */
+public class UnsubscribeApplicationEvent extends 
CompletableApplicationEvent {

Review Comment:
   Where does it block? I didn't see a call to `Future.get()` when I looked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >