[GitHub] [kafka] showuon commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed

2023-03-15 Thread via GitHub


showuon commented on PR #13348:
URL: https://github.com/apache/kafka/pull/13348#issuecomment-1471333242

   @robobario , nice catch! Welcome to submit a PR to fix it. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] garyparrot opened a new pull request, #13398: MINOR: Fix typos in valid range of socket buffer size

2023-03-15 Thread via GitHub


garyparrot opened a new pull request, #13398:
URL: https://github.com/apache/kafka/pull/13398

   Fix the following typos in Kafka connect configs: 
[send.buffer.bytes](https://kafka.apache.org/documentation/#connectconfigs_send.buffer.bytes)
 and 
[receive.buffer.bytes](https://kafka.apache.org/documentation/#connectconfigs_receive.buffer.bytes)
   
   
![image](https://user-images.githubusercontent.com/39105714/225489012-9b6f2205-fbcb-4cf9-939c-d6c96cbe6afb.png)
   
   Currently, the valid values start from `0`, although the documentation 
states `-1` is also a valid value.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] robobario commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed

2023-03-15 Thread via GitHub


robobario commented on PR #13348:
URL: https://github.com/apache/kafka/pull/13348#issuecomment-1471191098

   Hi, I think this introduced a race condition that can cause the 
producer-perf-test to log a lot of exceptions to console if num records > 
50. Since the `iteration` is incremented from the callback, multiple 
threads may see the same value when deciding whether to update `latencies`, so 
it can go out of bounds when updating `latencies`.
   ```
   bin/kafka-producer-perf-test.sh --topic perf-test --throughput -1
 --num-records 100 --producer-props acks=all 
bootstrap.servers=localhost:9092 --record-size 50
   ```
   Sometimes it logs:
   ```
   [2023-03-16 15:30:57,432] ERROR Error executing user-provided callback on 
message for topic-partition 'perf-test-0' 
(org.apache.kafka.clients.producer.internals.ProducerBatch)
   java.lang.ArrayIndexOutOfBoundsException: Index 51 out of bounds for 
length 51
at 
org.apache.kafka.tools.ProducerPerformance$Stats.record(ProducerPerformance.java:38
   ```
   It doesn't kill the test run but it looks alarming to see a couple of 
hundred errors in console.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13884) KRaft Obsever are not required to flush on every append

2023-03-15 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-13884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-13884.

Fix Version/s: 3.5.0
   Resolution: Fixed

> KRaft Obsever are not required to flush on every append
> ---
>
> Key: KAFKA-13884
> URL: https://issues.apache.org/jira/browse/KAFKA-13884
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.5.0
>
>
> The current implementation of the KRaft Client flushes to disk when observers 
> append to the log. This is not required since observer don't participate in 
> leader election and the advancement of the high-watermark.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13391: WIP: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-03-15 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1137901882


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import java.util.Collections
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+// Check if we have already (either node or individual transaction). 
+val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node)
+currentNodeAndTransactionDataOpt match {
+  case None =>
+nodesToTransactions.put(node,
+  new TransactionDataAndCallbacks(new 
AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()),
+mutable.Map(transactionData.transactionalId() -> callback)))
+  case Some(currentNodeAndTransactionData) =>
+// Check if we already have txn ID -- this should only happen in epoch 
bump case. If so, we should return error for old entry and remove from queue.
+val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+if (currentTransactionData != null) {
+  if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch()) {
+val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+currentTransactionData.topics().forEach { topic => 
+  topic.partitions().forEach { partition =>
+topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), Errors.INVALID_PRODUCER_EPOCH)
+  }
+}
+val callback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+
currentNodeAndTransactionData.transactionData.remove(transactionData.transactionalId())
+callback(topicPartitionsToError.toMap)
+  } else {
+// We should never see a request on the same epoch since we 
haven't finished handling the one in queue
+throw new InvalidRecordException("Received a second request from 
the same connection without finishing the first.")
+  }
+}
+currentNodeAndTransactionData.transactionData.add(transactionData)
+
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+}
+wakeup()
+  }
+
+  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
+override def onComplete(response: ClientResponse): Unit = {
+  inflightNodes.synchronized(inflightNodes.remove(node))
+  if (response.authenticationException() != null) {
+error(s"AddPartitionsToTxnRequest failed for broker 

[GitHub] [kafka] jolshan commented on a diff in pull request #13391: WIP: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-03-15 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1137856849


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import java.util.Collections
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+// Check if we have already (either node or individual transaction). 
+val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node)
+currentNodeAndTransactionDataOpt match {
+  case None =>
+nodesToTransactions.put(node,
+  new TransactionDataAndCallbacks(new 
AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()),
+mutable.Map(transactionData.transactionalId() -> callback)))
+  case Some(currentNodeAndTransactionData) =>
+// Check if we already have txn ID -- this should only happen in epoch 
bump case. If so, we should return error for old entry and remove from queue.
+val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+if (currentTransactionData != null) {
+  if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch()) {
+val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+currentTransactionData.topics().forEach { topic => 
+  topic.partitions().forEach { partition =>
+topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), Errors.INVALID_PRODUCER_EPOCH)
+  }
+}
+val callback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+
currentNodeAndTransactionData.transactionData.remove(transactionData.transactionalId())
+callback(topicPartitionsToError.toMap)
+  } else {
+// We should never see a request on the same epoch since we 
haven't finished handling the one in queue
+throw new InvalidRecordException("Received a second request from 
the same connection without finishing the first.")
+  }
+}
+currentNodeAndTransactionData.transactionData.add(transactionData)
+
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+}
+wakeup()
+  }
+
+  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
+override def onComplete(response: ClientResponse): Unit = {
+  inflightNodes.synchronized(inflightNodes.remove(node))
+  if (response.authenticationException() != null) {
+error(s"AddPartitionsToTxnRequest failed for broker 

[GitHub] [kafka] cmccabe merged pull request #13384: KAFKA-14801: Handle sensitive configs during ZK migration

2023-03-15 Thread via GitHub


cmccabe merged PR #13384:
URL: https://github.com/apache/kafka/pull/13384


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-15 Thread via GitHub


dajac commented on PR #13323:
URL: https://github.com/apache/kafka/pull/13323#issuecomment-1470710416

   @CalvinConfluent Could you rebase as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #13396: KAFKA-13884; Only voters flush on Fetch response

2023-03-15 Thread via GitHub


jsancio merged PR #13396:
URL: https://github.com/apache/kafka/pull/13396


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-15 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1137608147


##
clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java:
##
@@ -198,6 +204,35 @@ public void testForgottenTopics(short version) {
 }
 }
 
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+public void testFetchRequestSimpleBuilderDowngrade(short version) {

Review Comment:
   ReplicaStateDowngrade?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-15 Thread via GitHub


guozhangwang commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1137588681


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) {
 this.timer.update(currentTimeMs);
 }
 }
+
+private class FetchCommittedOffsetResponseHandler {

Review Comment:
   I saw that we do retry on just the `COORDINATOR_LOAD_IN_PROGRESS` and 
`NOT_COORDINATOR` and `unstableTxnOffsetTopicPartitions`. Just confirming with 
you are they the only possible retriable errors for this response (saw 
`OffsetFetchResponse` there are others)? What about `COORDINATOR_NOT_AVAILABLE`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) {
 this.timer.update(currentTimeMs);
 }
 }
+
+private class FetchCommittedOffsetResponseHandler {
+private final UnsentOffsetFetchRequestState request;
+
+private FetchCommittedOffsetResponseHandler(final 
UnsentOffsetFetchRequestState request) {
+this.request = request;
+}
+
+public void onResponse(
+final long currentTimeMs,
+final OffsetFetchResponse response) {
+Errors responseError = 
response.groupLevelError(groupState.groupId);
+if (responseError != Errors.NONE) {
+onFailure(currentTimeMs, responseError);
+return;
+}
+
+onSuccess(currentTimeMs, response);
+}
+private void onFailure(final long currentTimeMs,
+   final Errors responseError) {
+log.debug("Offset fetch failed: {}", responseError.message());
+
+if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+retry(currentTimeMs);
+} else if (responseError == Errors.NOT_COORDINATOR) {
+// re-discover the coordinator and retry
+
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+retry(currentTimeMs);
+} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+// TODO: I'm not sure if we should retry here.  Sounds like we 
should propagate the error to let the
+//  user to fix the permission
+
request.future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+} else {
+request.future.completeExceptionally(new 
KafkaException("Unexpected error in fetch offset response: " + 
responseError.message()));
+}
+return;
+}
+
+private void retry(final long currentTimeMs) {
+this.request.onFailedAttempt(currentTimeMs);
+unsentOffsetFetchRequests.enqueue(this.request);
+}
+
+private void onSuccess(final long currentTimeMs,
+   final OffsetFetchResponse response) {
+Set unauthorizedTopics = null;
+Map 
responseData =
+response.partitionDataMap(groupState.groupId);
+Map offsets = new 
HashMap<>(responseData.size());
+Set unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+for (Map.Entry 
entry : responseData.entrySet()) {
+TopicPartition tp = entry.getKey();
+OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+if (partitionData.hasError()) {
+Errors error = partitionData.error;
+log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+request.future.completeExceptionally(new 
KafkaException("Topic or Partition " + tp + " does not " +
+"exist"));
+return;
+} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+if (unauthorizedTopics == null) {
+unauthorizedTopics = new HashSet<>();
+}
+unauthorizedTopics.add(tp.topic());
+} else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
+System.out.println("asdkljlsadjflksajfdlk");
+unstableTxnOffsetTopicPartitions.add(tp);
+} else {
+request.future.completeExceptionally(new 
KafkaException("Unexpected error in fetch offset " +
+"response for partition " + tp + ": " + 
error.message()));
+return;
+}
+} else if 

[GitHub] [kafka] guozhangwang commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-15 Thread via GitHub


guozhangwang commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1137549960


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) {
 this.timer.update(currentTimeMs);
 }
 }
+
+private class FetchCommittedOffsetResponseHandler {

Review Comment:
   Ah thanks for pointing it out, will check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-15 Thread via GitHub


guozhangwang commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1137547234


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 }
 }
 
+static class UnsentOffsetFetchRequestState extends RequestState {
+public final Set requestedPartitions;
+public final GroupState.Generation requestedGeneration;
+public CompletableFuture> 
future;
+
+public UnsentOffsetFetchRequestState(final Set 
partitions,
+ final GroupState.Generation 
generation,
+ final 
CompletableFuture> future,
+ final long retryBackoffMs) {
+super(retryBackoffMs);
+this.requestedPartitions = partitions;
+this.requestedGeneration = generation;
+this.future = future;
+}
+
+public boolean sameRequest(final UnsentOffsetFetchRequestState 
request) {
+return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+}
+}
+
+/**
+ * This is used to support the committed() API. Here we use a Java 
Collections, {@code unsentRequests}, to
+ * track
+ * the OffsetFetchRequests that haven't been sent, to prevent sending the 
same requests in the same batch.
+ *
+ * If the request is new. It will be enqueued to the {@code 
unsentRequest}, and will be sent upon the next
+ * poll.
+ *
+ * If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+ * completion of the existing one.
+ *
+ * TODO: There is an optimization to present duplication to the sent but 
incompleted requests. I'm not sure if we
+ * need that.
+ */
+class UnsentOffsetFetchRequests {

Review Comment:
   Maybe it's not that common; but the current flag we used is to cover both 
periods, and I wonder if was conducted that way to avoid some surprising cases 
indeed.. So my rationale is just to keep the behavior consistent with the old 
code logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13368: KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft

2023-03-15 Thread via GitHub


cmccabe commented on code in PR #13368:
URL: https://github.com/apache/kafka/pull/13368#discussion_r1137526482


##
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##
@@ -211,12 +214,38 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends 
MigrationClient with Lo
 }
   }
 
+  def migrateAcls(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): 
Unit = {
+// This is probably fairly inefficient, but it preserves the semantics 
from AclAuthorizer (which is non-trivial)
+var allAcls = new scala.collection.immutable.TreeMap[ResourcePattern, 
VersionedAcls]()(new ResourceOrdering)
+def updateAcls(resourcePattern: ResourcePattern, versionedAcls: 
VersionedAcls): Unit = {
+  allAcls = allAcls.updated(resourcePattern, versionedAcls)
+}
+
+AclAuthorizer.loadAllAcls(zkClient, this, updateAcls)
+allAcls.foreach { case (resourcePattern, versionedAcls) =>

Review Comment:
   The main issue that I see here is that batches could become too big. Could 
you add some code to limit batches to 100 records or so?
   
   We had a similar bug in snapshot generation until we decoupled batching from 
record generation. So we should probably do the same thing here.  In other 
words, recordConsumer should take individual records and do the batching 
itself. Since we'll be in a metadata transaction, the batching is not 
meaningful here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13372: MINOR: Improved error handling in ZK migration

2023-03-15 Thread via GitHub


cmccabe commented on PR #13372:
URL: https://github.com/apache/kafka/pull/13372#issuecomment-1470482252

   @mumrah : `KRaftMigrationDriverTest.testMigrationWithClientException` is 
failing for me with this PR. Can you take a look?
   
   ```
   Gradle Test Run :metadata:test > Gradle Test Executor 3 > 
KRaftMigrationDriverTest > testMigrationWithClientException(boolean) > 
org.apache.kafka.metadata.migration.KRaftMigrationDriverTest.testMigrationWithClientException(boolean)[1]
 FAILED
   org.opentest4j.AssertionFailedError: expected:  but was: 
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at 
app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
   at 
app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
   at 
app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
   at 
app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180)
   at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriverTest.testMigrationWithClientException(KRaftMigrationDriverTest.java:358)```
   
   (Also Jenkins did something silly again, but hopefully the next build will 
work.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14811) The forwarding requests are discarded when network client is changed to/from zk/Kraft

2023-03-15 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-14811:
--

 Summary: The forwarding requests are discarded when network client 
is changed to/from zk/Kraft
 Key: KAFKA-14811
 URL: https://issues.apache.org/jira/browse/KAFKA-14811
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


We don't check the in-flight requests when closing stale network client. If the 
in-flight requests are related to metadata request from client, the client will 
get timeout exception. If the in-flight requests are related to ISR/leader, the 
partition can't be written as it can't meet mini ISR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14802) topic deletion bug

2023-03-15 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700698#comment-17700698
 ] 

Divij Vaidya commented on KAFKA-14802:
--

Have you checked the value of the setting 
[auto.create.topics.enable|https://kafka.apache.org/documentation.html#brokerconfigs_auto.create.topics.enable]
 on the broker and the value of 
[allow.auto.create.topics|https://kafka.apache.org/documentation.html#consumerconfigs_allow.auto.create.topics]
 on the consumer? Enabling these values could lead to auto creation of topic 
even if it is deleted.

> topic deletion bug
> --
>
> Key: KAFKA-14802
> URL: https://issues.apache.org/jira/browse/KAFKA-14802
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, replication
>Affects Versions: 3.3.2
> Environment: AWS m5.xlarge EC2 instance
>Reporter: Behavox
>Priority: Major
> Attachments: server.properties
>
>
> topic deletion doesn't work as expected when attempting to delete topic(s), 
> after successful deletion topic is recreated in a multi-controller 
> environment with 3 controllers and ReplicationFactor: 2
> How to reproduce - attempt to delete topic. Topic is removed successfully and 
> recreated right after removal. Example below shows a single topic named 
> example-topic. We have a total count of 17000 topics in the affected cluster.
>  
> Our config is attached. 
> Run 1
> [2023-03-10 16:16:45,625] INFO [Controller 1] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,722] INFO [Controller 1] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:16:45,730] INFO [Controller 2] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,851] INFO [Controller 2] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:16:45,837] INFO [Controller 3] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,833] INFO [Controller 3] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> Run 2
> [2023-03-10 16:20:22,469] INFO [Controller 1] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:19,711] INFO [Controller 1] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:20:22,674] INFO [Controller 2] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:20,022] INFO [Controller 2] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:20:22,674] INFO [Controller 3] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:20,020] INFO [Controller 3] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed

2023-03-15 Thread via GitHub


hudeqi commented on PR #13348:
URL: https://github.com/apache/kafka/pull/13348#issuecomment-1470060354

   Thx, Is this 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms)
 interested in understanding and discussing? @chia7712 @showuon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 merged pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed

2023-03-15 Thread via GitHub


chia7712 merged PR #13348:
URL: https://github.com/apache/kafka/pull/13348


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-15 Thread via GitHub


Hangleton commented on PR #13240:
URL: https://github.com/apache/kafka/pull/13240#issuecomment-1469956438

   Many thanks David. I will try to get to this in the next couple of days. 
Apologies for the delay, I wish i could get to this sooner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13378: KAFKA-14793: Propagate Topic Ids to the Group Coordinator during Offsets Commit

2023-03-15 Thread via GitHub


Hangleton commented on PR #13378:
URL: https://github.com/apache/kafka/pull/13378#issuecomment-1469954905

   Thanks for the merge David.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-15 Thread via GitHub


dajac commented on PR #13240:
URL: https://github.com/apache/kafka/pull/13240#issuecomment-1469825143

   @Hangleton I just merge https://github.com/apache/kafka/pull/13378. We can 
update this PR now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13378: KAFKA-14793: Propagate Topic Ids to the Group Coordinator during Offsets Commit

2023-03-15 Thread via GitHub


dajac merged PR #13378:
URL: https://github.com/apache/kafka/pull/13378


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13378: KAFKA-14793: Propagate Topic Ids to the Group Coordinator during Offsets Commit

2023-03-15 Thread via GitHub


dajac commented on PR #13378:
URL: https://github.com/apache/kafka/pull/13378#issuecomment-1469817830

   That's weird. The build is still reported `in progress` in the PR but the 
build has completed. The last build results are here: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13378/21/tests.
 Here are the failed tests:
   ```
   Build / JDK 11 and Scala 2.13 / 
shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
 – org.apache.kafka.streams.integration.NamedTopologyIntegrationTest
   16s
   Build / JDK 17 and Scala 2.13 / testReturnRecordsDuringRebalance() – 
org.apache.kafka.clients.consumer.KafkaConsumerTest
   <1s
   Build / JDK 17 and Scala 2.13 / testTrustStoreAlter(String).quorum=kraft – 
kafka.server.DynamicBrokerReconfigurationTest
   12s
   Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s
   Existing failures - 6
   Build / JDK 11 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, 
MetadataVersion=3.4-IV0, Security=PLAINTEXT – 
kafka.zk.ZkMigrationIntegrationTest
   1m 13s
   Build / JDK 11 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, 
MetadataVersion=3.4-IV0, Security=PLAINTEXT – 
kafka.zk.ZkMigrationIntegrationTest
   1m 25s
   Build / JDK 17 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, 
MetadataVersion=3.4-IV0, Security=PLAINTEXT – 
kafka.zk.ZkMigrationIntegrationTest
   1m 18s
   Build / JDK 17 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, 
MetadataVersion=3.4-IV0, Security=PLAINTEXT – 
kafka.zk.ZkMigrationIntegrationTest
   1m 27s
   Build / JDK 8 and Scala 2.12 / [1] Type=ZK, Name=testDualWrite, 
MetadataVersion=3.4-IV0, Security=PLAINTEXT – 
kafka.zk.ZkMigrationIntegrationTest
   1m 12s
   Build / JDK 8 and Scala 2.12 / [1] Type=ZK, Name=testDualWrite, 
MetadataVersion=3.4-IV0, Security=PLAINTEXT – 
kafka.zk.ZkMigrationIntegrationTest
   ```
   None of those are related to this PR. I am going to merge the PR.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-15 Thread via GitHub


dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136820680


##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -130,10 +131,35 @@ private static Optional optionalEpoch(int 
rawEpochValue) {
 }
 }
 
+// It is only used by KafkaRaftClient for downgrading the FetchRequest. 
Notice that, it will throw
+// UnsupportedOperationException if it is used for upgrading.
+public static class SimpleBuilder extends 
AbstractRequest.Builder {
+private final FetchRequestData fetchRequestData;
+public SimpleBuilder(FetchRequestData fetchRequestData) {

Review Comment:
   nit: Let's add an empty line before this one.



##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -130,10 +131,35 @@ private static Optional optionalEpoch(int 
rawEpochValue) {
 }
 }
 
+// It is only used by KafkaRaftClient for downgrading the FetchRequest. 
Notice that, it will throw
+// UnsupportedOperationException if it is used for upgrading.

Review Comment:
   nit: I would remove the second sentence because I find it unclear. What does 
`upgrading` mean here?



##
clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java:
##
@@ -198,6 +204,35 @@ public void testForgottenTopics(short version) {
 }
 }
 
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+public void testFetchRequestSimpleBuilderDowngrade(short version) {

Review Comment:
   nit: `...FetchStateDowngrade`?



##
core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala:
##
@@ -159,6 +162,28 @@ class KafkaNetworkChannelTest {
 }
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+  def testFetchRequestDowngrade(version: Short): Unit = {
+val destinationId = 2
+val destinationNode = new Node(destinationId, "127.0.0.1", 9092)
+channel.updateEndpoint(destinationId, new InetAddressSpec(
+  new InetSocketAddress(destinationNode.host, destinationNode.port)))
+sendTestRequest(ApiKeys.FETCH, destinationId)
+channel.pollOnce()
+
+assertEquals(1, client.requests().size())
+val request = client.requests().peek().requestBuilder().build(version)
+
+if (version < 15) {
+  assertTrue(request.asInstanceOf[FetchRequest].data().replicaId() == 1)

Review Comment:
   small nit: In Scala, we usually don't put the `()` for accessors. In this 
test, you can remove them for `data`, `replicaId`, `size`, `replicaState`, etc.



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -983,16 +988,16 @@ private CompletableFuture 
handleFetchRequest(
 Errors error = Errors.forException(cause);
 if (error != Errors.REQUEST_TIMED_OUT) {
 logger.debug("Failed to handle fetch from {} at {} due to 
{}",
-request.replicaId(), fetchPartition.fetchOffset(), 
error);
+replicaId, fetchPartition.fetchOffset(), error);

Review Comment:
   nit: Let's keep the original indentation please.



##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##
@@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception {
 context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
 }
 
+// This test mainly focuses on whether the leader state is correctly 
updated under different fetch version.
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short 
version) throws Exception {
+int localId = 0;
+int otherNodeId = 1;
+int epoch = 5;
+Set voters = Utils.mkSet(localId, otherNodeId);
+
+RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+// First, test with a correct fetch request.
+FetchRequestData fetchRequestData = context.fetchRequest(epoch, 
otherNodeId, 1L, epoch, 0);
+FetchRequestData downgradedRequest = new 
FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();
+context.deliverRequest(downgradedRequest);
+context.client.poll();
+context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(localId));
+assertEquals(1L, context.log.highWatermark().offset);

Review Comment:
   nit: Other tests use `context.client.highWatermark()`. Should we also use 
this one?



##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##
@@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception {
 context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
 }
 
+// This test 

[GitHub] [kafka] sionsmith commented on pull request #10233: KAFKA-9413: Auditing in Kafka

2023-03-15 Thread via GitHub


sionsmith commented on PR #10233:
URL: https://github.com/apache/kafka/pull/10233#issuecomment-1469564517

   It would be great to consider this as more companies are requiring an audit 
functionality 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista closed pull request #13397: short circuiting reads during fetch computation if enough bytes have …

2023-03-15 Thread via GitHub


badaiaqrandista closed pull request #13397: short circuiting reads during fetch 
computation if enough bytes have …
URL: https://github.com/apache/kafka/pull/13397


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista opened a new pull request, #13397: short circuiting reads during fetch computation if enough bytes have …

2023-03-15 Thread via GitHub


badaiaqrandista opened a new pull request, #13397:
URL: https://github.com/apache/kafka/pull/13397

   …been collected
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org