[jira] [Comment Edited] (KAFKA-14934) KafkaClusterTestKit makes FaultHandler accessible

2023-05-26 Thread Owen C.H. Leung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726127#comment-17726127
 ] 

Owen C.H. Leung edited comment on KAFKA-14934 at 5/27/23 2:49 AM:
--

Hi [~mumrah] , I think it is already exposed ? 

https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L518-L524


was (Author: JIRAUSER300460):
Hi [~mumrah] , Can I give a try to this ? I'm new to contributing to kafka and 
want to get my hands dirty with it

> KafkaClusterTestKit makes FaultHandler accessible
> -
>
> Key: KAFKA-14934
> URL: https://issues.apache.org/jira/browse/KAFKA-14934
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: David Arthur
>Priority: Trivial
>  Labels: good-first-issue
>
> In KafkaClusterTestKit, we use a mock fault handler to avoid exiting the 
> process during tests. It would be useful to expose this fault handler so 
> tests could verify certain fault conditions (like a broker/controller failing 
> to start)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-05-26 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1207577607


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {

Review Comment:
   This block of code uses already existing logic within `TopicAdmin`  to 
figure out if a topic exists or not. 
https://github.com/apache/kafka/blob/6d72c26731fe69955127a90e3d43f6d9eb41e2d3/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L503-L511.
 I am just caching it because if we established that a topic is deleted for 
partition p1, then we don't need to check again for other partitions of the 
same topic within this flow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13768: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll

2023-05-26 Thread via GitHub


jolshan commented on PR #13768:
URL: https://github.com/apache/kafka/pull/13768#issuecomment-1565102014

   Thanks for the PR! This looks promising. As Ismael said, let's share in 
trunk first. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult

2023-05-26 Thread via GitHub


gharris1727 opened a new pull request, #13771:
URL: https://github.com/apache/kafka/pull/13771

   The DelegatingClassLoader has a large number of fields and methods for 
keeping track of known PluginDesc objects. It has this in common with the 
PluginScanResult data object, which has a similar set of fields and methods. On 
trunk, the PluginScanResult is only used as a return value of one internal 
function, before it's results are accumulated by the DCL and discarded. To 
simplify the DCL, we should use the PluginScanResult object as a container to 
store and accumulate PluginDesc objects, and return it to the caller to allow 
them to inspect the scanned plugins without interacting with the DCL.
   
   Using the PluginScanResult as an accumulator, we can delay writing scan 
results to the pluginLoaders and aliases fields until after all scanning has 
taken place. This is done via the new installDiscoveredPlugins method. This 
prevents plugins being scanned from having an inconsistent delegation path: 
Previously, because the accumulators were updated as scanning proceeded, 
plugins may or may not be able to see one another via the 
DelegatingClassLoader::loadClass method. Now, plugins will _not_ be able to see 
one another before installDiscoveredPlugins is applied and scanning is finished.
   
   This is a first-pass refactor, before pulling the scanning logic out of the 
DCL to further simplify it. Once external scanning is complete, the caller will 
call installDiscoveredPlugins to finish initialization of the DCL.
   
   In the trunk implementation, there is some order-sensitivity to the scanning 
process because of the accumulator fields. In particular: 
   1. Because the classpath is scanned last, plugins on the classpath 
automatically take precedence over isolated plugins. In order to replicate this 
effect, PluginDesc objects now explicitly compare their isolated/non-isolated 
nature, and order classpath plugins first.
   2. The alias mechanism attempts to determine if aliases are unique, but 
allows the _first_ usage of an alias to be applied before denying later ones. 
The new mechanism still chooses an arbitrary plugin to get the alias, while 
denying others. I've left a TODO in case we want to discard all conflicted 
aliases instead of allowing an arbitrary choice.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15031) Add plugin.discovery worker configuration

2023-05-26 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15031:
---

 Summary: Add plugin.discovery worker configuration
 Key: KAFKA-15031
 URL: https://issues.apache.org/jira/browse/KAFKA-15031
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Reporter: Greg Harris
Assignee: Greg Harris


Add the worker configuration plugin.discovery as described in KIP-898.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15030) Add connect-plugin-path command line tool

2023-05-26 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15030:
---

 Summary: Add connect-plugin-path command line tool
 Key: KAFKA-15030
 URL: https://issues.apache.org/jira/browse/KAFKA-15030
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect, tools
Reporter: Greg Harris
Assignee: Greg Harris


Add the connect-plugin-path command line script as described in KIP-898



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207484112


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   validateAndAssignOffsets = false,
   leaderEpoch = -1,
   requestLocal = None,
+  verificationState = Optional.empty(),

Review Comment:
   We don't do verification on non-client origin requests  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207484112


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   validateAndAssignOffsets = false,
   leaderEpoch = -1,
   requestLocal = None,
+  verificationState = Optional.empty(),

Review Comment:
   I suppose I should confirm we won't have an issue when the verification 
state is blank -- I will make sure of this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14991) Improving Producer's record timestamp validation

2023-05-26 Thread Mehari Beyene (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mehari Beyene reassigned KAFKA-14991:
-

Assignee: Mehari Beyene

> Improving Producer's record timestamp validation
> 
>
> Key: KAFKA-14991
> URL: https://issues.apache.org/jira/browse/KAFKA-14991
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: Mehari Beyene
>Assignee: Mehari Beyene
>Priority: Minor
>
> When time-based retention is configured, the timestamp provided by the 
> producer is used by default to determine the retention period of the log. 
> Customers have the option of changing the configuration to use the broker's 
> timestamp by overriding the configuration for "log.message.timestamp.type", 
> but by default, the producer's timestamp is used. The producer's record 
> timestamp can be in the past or future. Kafka determines the retention time 
> of the log by comparing the broker's time with the record's time.
> Arguably, there can be use cases for a producer to send records with 
> timestamps that are in the past (for example, for replaying old data), but it 
> is inaccurate for records to have a timestamp that is far in the future 
> compared to the broker's current time.
> There is a configurable property called "message.timestamp.difference.max.ms" 
> that customers can use to control the allowed time difference between the 
> broker's current time and the record timestamp. However, the validation from 
> the Kafka code side can be improved by rejecting records with future 
> timestamps from being written in the first place.
> Customers have run into this issue in the past where a producer is configured 
> erroneously to set the record timestamp in nanoseconds instead of 
> milliseconds, resulting in a record timestamp that is in the future, and the 
> time-based retention policy did not kick in as expected.
> The improvement I am proposing is to add basic validation in 
> org.apache.kafka.storage.internals.log.LogValidator to reject record 
> timestamps that are in the future compared to the broker current timestamp 
> after accounting for a sensible tolerance for potential clock skew.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207324170


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   validateAndAssignOffsets = false,
   leaderEpoch = -1,
   requestLocal = None,
+  verificationState = Optional.empty(),

Review Comment:
   We currently don't. 
   ~I guess there is an argument that we could have a marker come in before a 
fetch response.~ 
   EDIT: if we don't replicate in order we have a problem, so I don't think we 
need to cover this.
   
   
   I'm not sure how this would be implemented. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration

2023-05-26 Thread via GitHub


mumrah commented on code in PR #13767:
URL: https://github.com/apache/kafka/pull/13767#discussion_r1207482435


##
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##
@@ -145,44 +144,47 @@ class ZkMigrationClient(
 topicClient.iterateTopics(
   util.EnumSet.allOf(classOf[TopicVisitorInterest]),
   new TopicVisitor() {
-  override def visitTopic(topicName: String, topicId: Uuid, assignments: 
util.Map[Integer, util.List[Integer]]): Unit = {
-if (!topicBatch.isEmpty) {
-  recordConsumer.accept(topicBatch)
-  topicBatch = new util.ArrayList[ApiMessageAndVersion]()
-}
+override def visitTopic(topicName: String, topicId: Uuid, assignments: 
util.Map[Integer, util.List[Integer]]): Unit = {
+  if (!topicBatch.isEmpty) {
+recordConsumer.accept(topicBatch)
+topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+  }
 
-topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
-  .setName(topicName)
-  .setTopicId(topicId), 0.toShort))
-  }
+  topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+.setName(topicName)
+.setTopicId(topicId), 0.toShort))
 
-  override def visitPartition(topicIdPartition: TopicIdPartition, 
partitionRegistration: PartitionRegistration): Unit = {
-val record = new PartitionRecord()
-  .setTopicId(topicIdPartition.topicId())
-  .setPartitionId(topicIdPartition.partition())
-  
.setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
-  
.setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
-  
.setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
-  .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
-  .setLeader(partitionRegistration.leader)
-  .setLeaderEpoch(partitionRegistration.leaderEpoch)
-  .setPartitionEpoch(partitionRegistration.partitionEpoch)
-  
.setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
-partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
-
partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
-topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
-  }
+  // This breaks the abstraction a bit, but the topic configs belong 
in the topic batch

Review Comment:
   I was considering the fact that we don't atomically apply the migration 
records during the migration. I think it's possible for the controller or 
broker to publish the migration metadata before it's all committed. In this 
case, I think it's probably safer to include the config records with the topic 
batch.
   
   This won't be an issue once we implement KIP-868.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan opened a new pull request, #13770: MINOR: Add config to producerStateManager config

2023-05-26 Thread via GitHub


jolshan opened a new pull request, #13770:
URL: https://github.com/apache/kafka/pull/13770

   Originally part of https://github.com/apache/kafka/pull/13608/files. Since 
there are so many files changed, I decided to just pull this out into its own 
PR.
   
   I have moved this config into producer state manager so it can be checked 
easily under the log lock when we are about to append.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan opened a new pull request, #13769: MINOR: Covering all epoch cases in add partitions to txn manager

2023-05-26 Thread via GitHub


jolshan opened a new pull request, #13769:
URL: https://github.com/apache/kafka/pull/13769

   Originally part of https://github.com/apache/kafka/pull/13608, Artem made a 
good point that this change was unrelated, so I'm making a minor PR to cover it.
   
   Cleaning up the AddPartitionsToTxnManager and covering the 3 epoch cases 
more clearly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller

2023-05-26 Thread via GitHub


cmccabe commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1207395237


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -223,6 +283,21 @@ public BrokerHeartbeatState next() {
 }
 }
 
+/**
+ * The maximum number of timed out heartbeats to count.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000;
+
+/**
+ * The time period over which to track timed out heartbeats.
+ */
+static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = 
TimeUnit.MINUTES.toNanos(5);
+
+/**
+ * The number of heartbeats to notice missing before we go into overload.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3;
+

Review Comment:
   I agree with you that 5 minutes is way too long to "mute" broker removals. 
That was a miss in the orignial PR. (As a side note, broker session is not 18 
seconds. It is 9 seconds.)
   
   I thought about this more and I think we may not need the overload state I 
originally wanted to introduce at all. We can simply do some basic processing 
on the heartbeat when we time it out. Specifically, we can update the "last 
seen time" of the broker. This will avoid the "congestion meltdown" behavior 
without introducing a new state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller

2023-05-26 Thread via GitHub


cmccabe commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1207382044


##
server-common/src/main/java/org/apache/kafka/server/metrics/WindowedEventCounter.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.metrics;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Counts the number of events that happen within a given time window. Not 
thread safe.
+ */
+public final class WindowedEventCounter {
+private final long[] eventTimes;
+
+private final long window;
+
+private int startIndex;
+
+private int endIndex;
+
+public WindowedEventCounter(
+int maxCount,
+long window
+) {

Review Comment:
   You can use any units you'd like here. For example it would be possible to 
use either ms or ns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207368803


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
 nodesToTransactions.synchronized {
   // Check if we have already have either node or individual transaction. 
Add the Node if it isn't there.
-  val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+  val existingNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
 new TransactionDataAndCallbacks(
   new AddPartitionsToTxnTransactionCollection(1),
   mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-  val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+  val existingTransactionData = 
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
 
-  // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
-  // reconnected so return the retriable network exception.
-  if (currentTransactionData != null) {
-val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
-  Errors.INVALID_PRODUCER_EPOCH
-else 
-  Errors.NETWORK_EXCEPTION
-val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-currentTransactionData.topics().forEach { topic =>
-  topic.partitions().forEach { partition =>
-topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
-  }
+  // There are 3 cases if we already have existing data
+  // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH 
for existing data since it is fenced
+  // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for 
existing data, since the client is likely retrying and we want another 
retriable exception 
+  // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH 
for the incoming data since it is fenced, do not add incoming data to verify
+  if (existingTransactionData != null) {
+if (existingTransactionData.producerEpoch() <= 
transactionData.producerEpoch()) {
+val error = if (existingTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+  Errors.INVALID_PRODUCER_EPOCH
+else
+  Errors.NETWORK_EXCEPTION
+  val oldCallback = 
existingNodeAndTransactionData.callbacks(transactionData.transactionalId())
+  
existingNodeAndTransactionData.transactionData.remove(transactionData)
+  oldCallback(topicPartitionsToError(existingTransactionData, error))
+} else {
+  // If the incoming transactionData's epoch is lower, we can return 
with INVALID_PRODUCER_EPOCH immediately.
+  callback(topicPartitionsToError(transactionData, 
Errors.INVALID_PRODUCER_EPOCH))
+  return
 }
-val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
-currentNodeAndTransactionData.transactionData.remove(transactionData)
-oldCallback(topicPartitionsToError.toMap)
   }
-  currentNodeAndTransactionData.transactionData.add(transactionData)
-  
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+

Review Comment:
   I'll separate this out into a new PR as I'm already splitting this up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207333185


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig,
   val sTime = time.milliseconds
   
   val transactionalProducerIds = mutable.HashSet[Long]()
+  var verificationState: Optional[VerificationState] = Optional.empty()

Review Comment:
   I don't think we would get a new one. We only need one per transaction 
right? 
   So either we succeed and no longer need to worry about the verification, or 
the partition is retried and get the same verification object from the first 
time. 
   
   We do have a test with multiple partitions but I would have to check if it 
also checks the verification state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207330097


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
 nodesToTransactions.synchronized {
   // Check if we have already have either node or individual transaction. 
Add the Node if it isn't there.
-  val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+  val existingNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
 new TransactionDataAndCallbacks(
   new AddPartitionsToTxnTransactionCollection(1),
   mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-  val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+  val existingTransactionData = 
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
 
-  // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
-  // reconnected so return the retriable network exception.
-  if (currentTransactionData != null) {
-val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
-  Errors.INVALID_PRODUCER_EPOCH
-else 
-  Errors.NETWORK_EXCEPTION
-val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-currentTransactionData.topics().forEach { topic =>
-  topic.partitions().forEach { partition =>
-topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
-  }
+  // There are 3 cases if we already have existing data
+  // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH 
for existing data since it is fenced
+  // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for 
existing data, since the client is likely retrying and we want another 
retriable exception 
+  // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH 
for the incoming data since it is fenced, do not add incoming data to verify
+  if (existingTransactionData != null) {
+if (existingTransactionData.producerEpoch() <= 
transactionData.producerEpoch()) {
+val error = if (existingTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+  Errors.INVALID_PRODUCER_EPOCH
+else
+  Errors.NETWORK_EXCEPTION
+  val oldCallback = 
existingNodeAndTransactionData.callbacks(transactionData.transactionalId())
+  
existingNodeAndTransactionData.transactionData.remove(transactionData)
+  oldCallback(topicPartitionsToError(existingTransactionData, error))
+} else {
+  // If the incoming transactionData's epoch is lower, we can return 
with INVALID_PRODUCER_EPOCH immediately.
+  callback(topicPartitionsToError(transactionData, 
Errors.INVALID_PRODUCER_EPOCH))
+  return
 }
-val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
-currentNodeAndTransactionData.transactionData.remove(transactionData)
-oldCallback(topicPartitionsToError.toMap)
   }
-  currentNodeAndTransactionData.transactionData.add(transactionData)
-  
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+

Review Comment:
   yes - we didn't really cover these epoch cases before, and I thought it 
would be good to include them for completeness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207324170


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   validateAndAssignOffsets = false,
   leaderEpoch = -1,
   requestLocal = None,
+  verificationState = Optional.empty(),

Review Comment:
   We currently don't. 
   I guess there is an argument that we could have a marker come in before a 
fetch response. 
   
   I'm not sure how this would be implemented. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #13766: KAFKA-14996: Limit partition count in Create Topic and Create Partitions

2023-05-26 Thread via GitHub


cmccabe closed pull request #13766: KAFKA-14996: Limit partition count in 
Create Topic and Create Partitions
URL: https://github.com/apache/kafka/pull/13766


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13766: KAFKA-14996: Limit partition count in Create Topic and Create Partitions

2023-05-26 Thread via GitHub


cmccabe commented on PR #13766:
URL: https://github.com/apache/kafka/pull/13766#issuecomment-1564905334

   Thanks @edoardocomar. Closing as duplicate of #13742 .
   
   If you're interested in doing more here, check out the discussion on the 
other PR about a possible KIP we could have (to add some configurable limits)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


artemlivshits commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1206199912


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -579,10 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 result
   }
 
+  def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, 
baseSequence: Int): Optional[VerificationState] = lock synchronized {
+val entry = producerStateManager.entryForVerification(producerId, 
producerEpoch, baseSequence)
+if (entry.currentTxnFirstOffset.isPresent) {

Review Comment:
   Maybe add a comment why we don't need verification if 
currentTxnFirstOffset.isPresent.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -103,28 +115,57 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 return false;
 }
 }
+
+public void maybeAddVerificationState() {
+// If we already have a verification state, we can reuse it. This is 
because we know this is the same transaction 
+// as the state is cleared upon writing a control marker.
+if (!this.verificationState.isPresent())
+this.verificationState = Optional.of(new VerificationState());

Review Comment:
   This creates an object of class Optional that points to an object of class 
Verification state, so we get an extra object for every producer entry.  We 
just need a plain value of an Object to avoid extra overhead.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   validateAndAssignOffsets = false,
   leaderEpoch = -1,
   requestLocal = None,
+  verificationState = Optional.empty(),

Review Comment:
   Do we ever go through verification logic for followers?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -980,6 +1002,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+
+  // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction state

Review Comment:
   Maybe add more comments on how we validate transaction state.  Also maybe 
not here but at least somewhere we should have a detailed comment about the 
race condition we're addressing and specifically how the verificationState 
solves the ABA problem.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationState.java:
##
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class serves as a unique object to ensure the same transaction is 
being verified.
+ * When verification starts, this object is created and checked before append 
to ensure the producer state entry
+ * is not modified (via ending the transaction) before the record is appended.
+ */
+public class VerificationState {

Review Comment:
   Do we need a separate class for that?  I think we could just use Object, 
because we just compare references, not values.



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
 nodesToTransactions.synchronized {
   // Check if we have already have either node or individual transaction. 
Add the Node if it isn't there.
-  val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+  val existingNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
 new TransactionDataAndCallbacks(
   new AddPartitionsToTxnTransactionCollection(1),
   mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-  val currentTransactionData = 

[GitHub] [kafka] ijuma commented on pull request #13768: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll

2023-05-26 Thread via GitHub


ijuma commented on PR #13768:
URL: https://github.com/apache/kafka/pull/13768#issuecomment-1564903842

   We typically make changes to master first. Would you be willing to submit a 
PR for that instead?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

2023-05-26 Thread via GitHub


cmccabe commented on PR #13742:
URL: https://github.com/apache/kafka/pull/13742#issuecomment-1564904075

   Thanks for all the reviews, and thanks @mumrah for the LGTM. Since this is a 
3.5 blocker I am getting it in today so that it will be in the next RC.
   
   As I said before, this doesn't add any new limits, but just prevents damage 
to the controller when the existing limits are exceeded. However, the 
discussion about limits here was good. I think we should consider a follow-on 
KIP to make the maximum number of records per user operation configurable, and 
possibly add a configurabe partitions_max as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

2023-05-26 Thread via GitHub


cmccabe merged PR #13742:
URL: https://github.com/apache/kafka/pull/13742


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll

2023-05-26 Thread Ruslan Scherbakov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726728#comment-17726728
 ] 

Ruslan Scherbakov edited comment on KAFKA-9693 at 5/26/23 8:05 PM:
---

Related pull request: [https://github.com/apache/kafka/pull/13768]

The issue with repeating latency spikes during Kafka log segments rolling still 
reproduced on the latest versions including kafka_2.13-3.4.0.

It was found that flushing Kafka snapshot file during segments rolling blocks 
producer request handling thread for some time:
[https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/ProducerStateManager.scala#L452]
{code:java}
  private def writeSnapshot(file: File, entries: mutable.Map[Long, 
ProducerStateEntry]): Unit = {
...
    val fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)
    try {
      fileChannel.write(buffer)
      fileChannel.force(true)     <- here
    } finally {
      fileChannel.close()
    }...{code}
More partitions - more cumulative latency effect observed.}}{}}}

Suggested fix offloads flush (fileChannel.force) operation to the background 
thread similar to (but not exactly) how it was done in the UnifiedLog.scala:
{code:java}
  def roll(
   ...
    // Schedule an asynchronous flush of the old segment
    scheduler.schedule("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
  }{code}
The benchmarking using this fix shows significant reduction in repeating 
latency spikes:

*Test config:*

AWS
3 node cluster (i3en.2xlarge)
zulu11.62.17-ca-jdk11.0.18-linux_x64, heap 6G per broker
1 loadgen (m5n.8xlarge) - OpenMessaging benchmark 
([OMB|https://github.com/openmessaging/benchmark])
1 zookeeper (t2.small)
acks=all batchSize=1048510 consumers=4 insyncReplicas=2 lingerMs=1 mlen=1024 
producers=4 rf=3 subscriptions=1 targetRate=200k time=12m topics=1 warmup=1m

*variation 1:*

partitions=10
||metric||kafka_2.13-3.4.0||kafka_2.13-3.4.0 patched||
|endToEnd service_time (ms) p50 max|2.00|2.00|
|endToEnd service_time (ms) p75 max|3.00|2.00|
|endToEnd service_time (ms) p95 max|94.0|3.00|
|endToEnd service_time (ms) p99 max|290|6.00|
|endToEnd service_time (ms) p99.9 max|355|21.0|
|endToEnd service_time (ms) p99.99 max|372|34.0|
|endToEnd service_time (ms) p100 max|374|36.0|
|publish service_time (ms) p50 max|1.70|1.67|
|publish service_time (ms) p75 max|2.23|2.09|
|publish service_time (ms) p95 max|90.7|2.82|
|publish service_time (ms) p99 max|287|4.69|
|publish service_time (ms) p99.9 max|353|19.6|
|publish service_time (ms) p99.99 max|369|31.3|
|publish service_time (ms) p100 max|371|33.5|

 
||kafka||endToEnd chart||
|kafka_2.13-3.4.0|[!https://user-images.githubusercontent.com/6793713/241306935-ec329711-47d4-459f-92d7-06310b770023.png!|https://user-images.githubusercontent.com/6793713/241306935-ec329711-47d4-459f-92d7-06310b770023.png]|
|kafka_2.13-3.4.0 
patched|[!https://user-images.githubusercontent.com/6793713/241307047-e5aeb6a6-d33a-4d57-be80-c916fa1f05be.png!|https://user-images.githubusercontent.com/6793713/241307047-e5aeb6a6-d33a-4d57-be80-c916fa1f05be.png]|

latency score improved up to 10x times in high percentiles ^^^, spikes almost 
invisible

*variation 2:*

partitions=100
||metric||kafka_2.13-3.4.0||kafka_2.13-3.4.0 patched||
|endToEnd service_time (ms) p50 max|91.0|2.00|
|endToEnd service_time (ms) p75 max|358|3.00|
|endToEnd service_time (ms) p95 max|1814|4.00|
|endToEnd service_time (ms) p99 max|2777|21.0|
|endToEnd service_time (ms) p99.9 max|3643|119|
|endToEnd service_time (ms) p99.99 max|3724|141|
|endToEnd service_time (ms) p100 max|3726|143|
|publish service_time (ms) p50 max|77.4|1.92|
|publish service_time (ms) p75 max|352|2.35|
|publish service_time (ms) p95 max|1748|3.80|
|publish service_time (ms) p99 max|2740|18.9|
|publish service_time (ms) p99.9 max|3619|116|
|publish service_time (ms) p99.99 max|3720|139|
|publish service_time (ms) p100 max|3722|141|
|endToEnd service_time| | |

 
||kafka||endToEnd chart||
|kafka_2.13-3.4.0|[!https://user-images.githubusercontent.com/6793713/241307517-dc6e9820-c3b7-4bd0-8dac-bce9f3886d91.png!|https://user-images.githubusercontent.com/6793713/241307517-dc6e9820-c3b7-4bd0-8dac-bce9f3886d91.png]|
|kafka_2.13-3.4.0 
patched|[!https://user-images.githubusercontent.com/6793713/241307546-113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5.png!|https://user-images.githubusercontent.com/6793713/241307546-113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5.png]|

latency score improved up to 25x times in high percentiles ^^^

The fix was done for 3.4 branch - scala version of ProducerStateManager. Trunk 
needs corresponding fix for ProducerStateManager.java.


was (Author: novosibman):
Related pull request: [https://github.com/apache/kafka/pull/13768]

The issue with repeating latency spikes during Kafka log segments rolling still 
reproduced on the latest versions including 

[jira] [Comment Edited] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll

2023-05-26 Thread Ruslan Scherbakov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726728#comment-17726728
 ] 

Ruslan Scherbakov edited comment on KAFKA-9693 at 5/26/23 8:00 PM:
---

Related pull request: [https://github.com/apache/kafka/pull/13768]

The issue with repeating latency spikes during Kafka log segments rolling still 
reproduced on the latest versions including kafka_2.13-3.4.0.

It was found that flushing Kafka snapshot file during segments rolling blocks 
producer request handling thread for some time:
[https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/ProducerStateManager.scala#L452]
{code:java}
  private def writeSnapshot(file: File, entries: mutable.Map[Long, 
ProducerStateEntry]): Unit = {
...
    val fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)
    try {
      fileChannel.write(buffer)
      fileChannel.force(true)     <- here
    } finally {
      fileChannel.close()
    }...{code}
More partitions - more cumulative latency effect observed.{{{}{}}}

Suggested fix offloads flush (fileChannel.force) operation to the background 
thread similar to (but not exactly) how it was done in the UnifiedLog.scala:
{code:java}
  def roll(
   ...
    // Schedule an asynchronous flush of the old segment
    scheduler.schedule("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
  }{code}
The benchmarking using this fix shows significant reduction in repeating 
latency spikes:

*Test config:*

AWS
3 node cluster (i3en.2xlarge)
zulu11.62.17-ca-jdk11.0.18-linux_x64, heap 6G per broker
1 loadgen (m5n.8xlarge) - OpenMessaging benchmark 
([OMB|https://github.com/openmessaging/benchmark])
1 zookeeper (t2.small)
acks=all batchSize=1048510 consumers=4 insyncReplicas=2 lingerMs=1 mlen=1024 
producers=4 rf=3 subscriptions=1 targetRate=200k time=12m topics=1 warmup=1m

*variation 1:*

partitions=10
||metric||kafka_2.13-3.4.0||kafka_2.13-3.4.0 patched||
|endToEnd service_time (ms) p50 max|2.00|2.00|
|endToEnd service_time (ms) p75 max|3.00|2.00|
|endToEnd service_time (ms) p95 max|94.0|3.00|
|endToEnd service_time (ms) p99 max|290|6.00|
|endToEnd service_time (ms) p99.9 max|355|21.0|
|endToEnd service_time (ms) p99.99 max|372|34.0|
|endToEnd service_time (ms) p100 max|374|36.0|
|publish service_time (ms) p50 max|1.70|1.67|
|publish service_time (ms) p75 max|2.23|2.09|
|publish service_time (ms) p95 max|90.7|2.82|
|publish service_time (ms) p99 max|287|4.69|
|publish service_time (ms) p99.9 max|353|19.6|
|publish service_time (ms) p99.99 max|369|31.3|
|publish service_time (ms) p100 max|371|33.5|
||kafka||endToEnd chart||
|kafka_2.13-3.4.0|[!https://user-images.githubusercontent.com/6793713/241306935-ec329711-47d4-459f-92d7-06310b770023.png|width=703,height=281!|https://user-images.githubusercontent.com/6793713/241306935-ec329711-47d4-459f-92d7-06310b770023.png]|
|kafka_2.13-3.4.0 
patched|[!https://user-images.githubusercontent.com/6793713/241307047-e5aeb6a6-d33a-4d57-be80-c916fa1f05be.png|width=700,height=280!|https://user-images.githubusercontent.com/6793713/241307047-e5aeb6a6-d33a-4d57-be80-c916fa1f05be.png]|

latency score improved up to 10x times in high percentiles ^^^, spikes almost 
invisible

*variation 2:*

partitions=100
||metric||kafka_2.13-3.4.0||kafka_2.13-3.4.0 patched||
|endToEnd service_time (ms) p50 max|91.0|2.00|
|endToEnd service_time (ms) p75 max|358|3.00|
|endToEnd service_time (ms) p95 max|1814|4.00|
|endToEnd service_time (ms) p99 max|2777|21.0|
|endToEnd service_time (ms) p99.9 max|3643|119|
|endToEnd service_time (ms) p99.99 max|3724|141|
|endToEnd service_time (ms) p100 max|3726|143|
|publish service_time (ms) p50 max|77.4|1.92|
|publish service_time (ms) p75 max|352|2.35|
|publish service_time (ms) p95 max|1748|3.80|
|publish service_time (ms) p99 max|2740|18.9|
|publish service_time (ms) p99.9 max|3619|116|
|publish service_time (ms) p99.99 max|3720|139|
|publish service_time (ms) p100 max|3722|141|
|endToEnd service_time| | |
||kafka||endToEnd chart||
|kafka_2.13-3.4.0|[!https://user-images.githubusercontent.com/6793713/241307517-dc6e9820-c3b7-4bd0-8dac-bce9f3886d91.png|width=645,height=258!|https://user-images.githubusercontent.com/6793713/241307517-dc6e9820-c3b7-4bd0-8dac-bce9f3886d91.png]|
|kafka_2.13-3.4.0 
patched|[!https://user-images.githubusercontent.com/6793713/241307546-113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5.png|width=643,height=257!|https://user-images.githubusercontent.com/6793713/241307546-113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5.png]|

latency score improved up to 25x times in high percentiles ^^^

The fix was done for 3.4 branch - scala version of ProducerStateManager. Trunk 
needs corresponding fix for ProducerStateManager.java.


was (Author: novosibman):
Related pull request: https://github.com/apache/kafka/pull/13768

The issue with repeating latency spikes during Kafka log 

[jira] [Commented] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll

2023-05-26 Thread Ruslan Scherbakov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726728#comment-17726728
 ] 

Ruslan Scherbakov commented on KAFKA-9693:
--

Related pull request: https://github.com/apache/kafka/pull/13768

The issue with repeating latency spikes during Kafka log segments rolling still 
reproduced on the latest versions including kafka_2.13-3.4.0.

It was found that flushing Kafka snapshot file during segments rolling blocks 
producer request handling thread for some time:
[https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/ProducerStateManager.scala#L452]

 
{code:java}
  private def writeSnapshot(file: File, entries: mutable.Map[Long, 
ProducerStateEntry]): Unit = {
...
    val fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)
    try {
      fileChannel.write(buffer)
      fileChannel.force(true)     <- here
    } finally {
      fileChannel.close()
    }...{code}
More partitions - more cumulative latency effect observed.

 

{{{}{}}}{{{}{}}}

Suggested fix offloads flush (fileChannel.force) operation to the background 
thread similar to (but not exactly) how it was done in the UnifiedLog.scala:

 
{code:java}
  def roll(
   ...
    // Schedule an asynchronous flush of the old segment
    scheduler.schedule("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
  }{code}
 

 

The benchmarking using this fix shows significant reduction in repeating 
latency spikes:
{{}}
Test config:
AWS
3 node cluster (i3en.2xlarge)
zulu11.62.17-ca-jdk11.0.18-linux_x64, heap 6G per broker
1 loadgen (m5n.8xlarge) - OpenMessaging benchmark 
([OMB|https://github.com/openmessaging/benchmark])
1 zookeeper (t2.small)
acks=all batchSize=1048510 consumers=4 insyncReplicas=2 lingerMs=1 mlen=1024 
producers=4 rf=3 subscriptions=1 targetRate=200k time=12m topics=1 warmup=1m
h3. variation 1:

partitions=10
||metric||kafka_2.13-3.4.0||kafka_2.13-3.4.0 patched||
|endToEnd service_time (ms) p50 max|2.00|2.00|
|endToEnd service_time (ms) p75 max|3.00|2.00|
|endToEnd service_time (ms) p95 max|94.0|3.00|
|endToEnd service_time (ms) p99 max|290|6.00|
|endToEnd service_time (ms) p99.9 max|355|21.0|
|endToEnd service_time (ms) p99.99 max|372|34.0|
|endToEnd service_time (ms) p100 max|374|36.0|
|publish service_time (ms) p50 max|1.70|1.67|
|publish service_time (ms) p75 max|2.23|2.09|
|publish service_time (ms) p95 max|90.7|2.82|
|publish service_time (ms) p99 max|287|4.69|
|publish service_time (ms) p99.9 max|353|19.6|
|publish service_time (ms) p99.99 max|369|31.3|
|publish service_time (ms) p100 max|371|33.5|

||kafka||endToEnd chart||
|kafka_2.13-3.4.0|[!https://user-images.githubusercontent.com/6793713/241306935-ec329711-47d4-459f-92d7-06310b770023.png|width=703,height=281!|https://user-images.githubusercontent.com/6793713/241306935-ec329711-47d4-459f-92d7-06310b770023.png]|
|kafka_2.13-3.4.0 
patched|[!https://user-images.githubusercontent.com/6793713/241307047-e5aeb6a6-d33a-4d57-be80-c916fa1f05be.png|width=700,height=280!|https://user-images.githubusercontent.com/6793713/241307047-e5aeb6a6-d33a-4d57-be80-c916fa1f05be.png]|

latency score improved up to 10x times in high percentiles ^^^, spikes almost 
invisible
h3. variation 2:

partitions=100
||metric||kafka_2.13-3.4.0||kafka_2.13-3.4.0 patched||
|endToEnd service_time (ms) p50 max|91.0|2.00|
|endToEnd service_time (ms) p75 max|358|3.00|
|endToEnd service_time (ms) p95 max|1814|4.00|
|endToEnd service_time (ms) p99 max|2777|21.0|
|endToEnd service_time (ms) p99.9 max|3643|119|
|endToEnd service_time (ms) p99.99 max|3724|141|
|endToEnd service_time (ms) p100 max|3726|143|
|publish service_time (ms) p50 max|77.4|1.92|
|publish service_time (ms) p75 max|352|2.35|
|publish service_time (ms) p95 max|1748|3.80|
|publish service_time (ms) p99 max|2740|18.9|
|publish service_time (ms) p99.9 max|3619|116|
|publish service_time (ms) p99.99 max|3720|139|
|publish service_time (ms) p100 max|3722|141|
|endToEnd service_time| | |

||kafka||endToEnd chart||
|kafka_2.13-3.4.0|[!https://user-images.githubusercontent.com/6793713/241307517-dc6e9820-c3b7-4bd0-8dac-bce9f3886d91.png|width=645,height=258!|https://user-images.githubusercontent.com/6793713/241307517-dc6e9820-c3b7-4bd0-8dac-bce9f3886d91.png]|
|kafka_2.13-3.4.0 
patched|[!https://user-images.githubusercontent.com/6793713/241307546-113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5.png|width=643,height=257!|https://user-images.githubusercontent.com/6793713/241307546-113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5.png]|

latency score improved up to 25x times in high percentiles ^^^

The fix was done for 3.4 branch - scala version of ProducerStateManager. Trunk 
needs corresponding fix for ProducerStateManager.java.

> Kafka latency spikes caused by log segment flush on roll
> 
>
> Key: KAFKA-9693
> 

[GitHub] [kafka] novosibman opened a new pull request, #13768: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll

2023-05-26 Thread via GitHub


novosibman opened a new pull request, #13768:
URL: https://github.com/apache/kafka/pull/13768

   Related issue https://issues.apache.org/jira/browse/KAFKA-9693
   
   The issue with repeating latency spikes during Kafka log segments rolling 
still reproduced on the latest versions including  kafka_2.13-3.4.0.
   
   It was found that flushing Kafka snapshot file during segments rolling 
blocks producer request handling thread for some time:
   
https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/ProducerStateManager.scala#L452
   ```
 private def writeSnapshot(file: File, entries: mutable.Map[Long, 
ProducerStateEntry]): Unit = {
   ...
   val fileChannel = FileChannel.open(file.toPath, 
StandardOpenOption.CREATE, StandardOpenOption.WRITE)
   try {
 fileChannel.write(buffer)
 fileChannel.force(true) <- here
   } finally {
 fileChannel.close()
   }...
   
   ```
   More partitions - more cumulative latency effect observed.
   
   Suggested fix offloads flush (fileChannel.force) operation to the background 
thread similar to (but not exactly) how it was done in the UnifiedLog.scala:
   ```
 def roll(
  ...
   // Schedule an asynchronous flush of the old segment
   scheduler.schedule("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
 }
   ```
   The benchmarking using this fix shows significant reduction in repeating 
latency spikes:
   
   test config: 
   AWS
   3 node cluster (i3en.2xlarge)
   zulu11.62.17-ca-jdk11.0.18-linux_x64, heap 6G per broker
   1 loadgen (m5n.8xlarge) - OpenMessaging benchmark 
([OMB](https://github.com/openmessaging/benchmark)) 
   1  zookeeper (t2.small)
   acks=all batchSize=1048510 consumers=4 insyncReplicas=2 lingerMs=1 mlen=1024 
producers=4 rf=3 subscriptions=1 targetRate=200k time=12m topics=1 warmup=1m 
   
   ### variation 1:
   partitions=10 
   
   
   metric | kafka_2.13-3.4.0 | kafka_2.13-3.4.0 patched
   -- | -- | --
   endToEnd service_time (ms) p50 max | 2.00 | 2.00 
   endToEnd service_time (ms) p75 max | 3.00 | 2.00 
   endToEnd service_time (ms) p95 max | 94.0 | 3.00 
   endToEnd service_time (ms) p99 max | 290 | 6.00  
   endToEnd service_time (ms) p99.9 max | 355 | 21.0
   endToEnd service_time (ms) p99.99 max | 372 | 34.0
   endToEnd service_time (ms) p100 max | 374 | 36.0 
   publish service_time (ms) p50 max | 1.70 | 1.67 
   publish service_time (ms) p75 max | 2.23 | 2.09 
   publish service_time (ms) p95 max | 90.7 | 2.82 
   publish service_time (ms) p99 max | 287 | 4.69  
   publish service_time (ms) p99.9 max | 353 | 19.6
   publish service_time (ms) p99.99 max | 369 | 31.3
   publish service_time (ms) p100 max | 371 | 33.5 
   
   kafka | endToEnd chart
   -- | --
   kafka_2.13-3.4.0 | 
![image](https://github.com/apache/kafka/assets/6793713/ec329711-47d4-459f-92d7-06310b770023)
 
   kafka_2.13-3.4.0 patched | 
![image](https://github.com/apache/kafka/assets/6793713/e5aeb6a6-d33a-4d57-be80-c916fa1f05be)
   
   latency score improved up to 10x times in high percentiles ^^^, spikes 
almost invisible
   
   ### variation 2:
   partitions=100
   metric | kafka_2.13-3.4.0 | kafka_2.13-3.4.0 patched
   -- | -- | --
   endToEnd service_time (ms) p50 max | 91.0 | 2.00 
   endToEnd service_time (ms) p75 max | 358 | 3.00 
   endToEnd service_time (ms) p95 max | 1814 | 4.00
   endToEnd service_time (ms) p99 max | 2777 | 21.0
   endToEnd service_time (ms) p99.9 max | 3643 | 119
   endToEnd service_time (ms) p99.99 max | 3724 | 141
   endToEnd service_time (ms) p100 max | 3726 | 143  
   publish service_time (ms) p50 max | 77.4 | 1.92 
   publish service_time (ms) p75 max | 352 | 2.35  
   publish service_time (ms) p95 max | 1748 | 3.80
   publish service_time (ms) p99 max | 2740 | 18.9
   publish service_time (ms) p99.9 max | 3619 | 116
   publish service_time (ms) p99.99 max | 3720 | 139
   publish service_time (ms) p100 max | 3722 | 141
   endToEnd service_time  
   
   kafka | endToEnd chart
   -- | --
   kafka_2.13-3.4.0 | 
![image](https://github.com/apache/kafka/assets/6793713/dc6e9820-c3b7-4bd0-8dac-bce9f3886d91)
 
   kafka_2.13-3.4.0 patched | 
![image](https://github.com/apache/kafka/assets/6793713/113b4480-97a4-4dd5-8d5c-f7dc87a3d7a5)
   
   latency score improved up to 25x times in high percentiles ^^^
   
   The fix was done for 3.4 branch  - scala version of ProducerStateManager. 
Trunk needs corresponding fix for ProducerStateManager.java.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration

2023-05-26 Thread via GitHub


cmccabe commented on code in PR #13767:
URL: https://github.com/apache/kafka/pull/13767#discussion_r1207234024


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -194,23 +195,44 @@ void handleTopicsDelta(Function 
deletedTopicNameResolver, TopicsDe
 }
 
 void handleConfigsSnapshot(ConfigurationsImage configsImage) {
-Set brokersToUpdate = new HashSet<>();
+Set newResources = new 
HashSet<>(configsImage.resourceData().keySet())

Review Comment:
   I think we should throw an exception if we get a ConfigResource whose type 
isn't BROKER or TOPIC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration

2023-05-26 Thread via GitHub


cmccabe commented on code in PR #13767:
URL: https://github.com/apache/kafka/pull/13767#discussion_r1207225622


##
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##
@@ -145,44 +144,47 @@ class ZkMigrationClient(
 topicClient.iterateTopics(
   util.EnumSet.allOf(classOf[TopicVisitorInterest]),
   new TopicVisitor() {
-  override def visitTopic(topicName: String, topicId: Uuid, assignments: 
util.Map[Integer, util.List[Integer]]): Unit = {
-if (!topicBatch.isEmpty) {
-  recordConsumer.accept(topicBatch)
-  topicBatch = new util.ArrayList[ApiMessageAndVersion]()
-}
+override def visitTopic(topicName: String, topicId: Uuid, assignments: 
util.Map[Integer, util.List[Integer]]): Unit = {
+  if (!topicBatch.isEmpty) {
+recordConsumer.accept(topicBatch)
+topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+  }
 
-topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
-  .setName(topicName)
-  .setTopicId(topicId), 0.toShort))
-  }
+  topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+.setName(topicName)
+.setTopicId(topicId), 0.toShort))
 
-  override def visitPartition(topicIdPartition: TopicIdPartition, 
partitionRegistration: PartitionRegistration): Unit = {
-val record = new PartitionRecord()
-  .setTopicId(topicIdPartition.topicId())
-  .setPartitionId(topicIdPartition.partition())
-  
.setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
-  
.setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
-  
.setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
-  .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
-  .setLeader(partitionRegistration.leader)
-  .setLeaderEpoch(partitionRegistration.leaderEpoch)
-  .setPartitionEpoch(partitionRegistration.partitionEpoch)
-  
.setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
-partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
-
partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
-topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
-  }
+  // This breaks the abstraction a bit, but the topic configs belong 
in the topic batch

Review Comment:
   It's not really required for the topic config records to come right after 
the topics. It would be OK to do it in a separate section as we do with 
snapshots.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13757: [WIP] Fix snapshot load during dual write.

2023-05-26 Thread via GitHub


mumrah commented on PR #13757:
URL: https://github.com/apache/kafka/pull/13757#issuecomment-1564750482

   The jira for this is KAFKA-15017 btw


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13766: KAFKA-14996: Limit partition count in Create Topic and Create Partitions

2023-05-26 Thread via GitHub


mumrah commented on PR #13766:
URL: https://github.com/apache/kafka/pull/13766#issuecomment-1564748396

   @edoardocomar, thanks for taking a look at this. This issue is a little bit 
tricky since PartitionRecords are not the only thing inside the topic creation 
batch. There's also a TopicRecord and an unbounded number of ConfigRecords. 
@cmccabe has a related PR #13742 which deals with overly large batches.
   
   Once we implement KIP-868 (which will hopefully be very soon), I don't think 
we want to impose any limits on the partition count except by what is enforced 
by a user-supplied CreateTopicPolicy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

2023-05-26 Thread via GitHub


cmccabe commented on PR #13742:
URL: https://github.com/apache/kafka/pull/13742#issuecomment-1564712814

   Hi all,
   
   Thanks for the reviews and comments.
   
   > @divijvaidya : But I was wondering if an additional guard could be to have 
a default TopicCreationPolicy with a MaxValue of X number of topics per 
requests (X is probably <= 10K). On every topic creation request, we will apply 
the policy and ensure that the request gets rejected upfront before entering 
the system. We could extend this pattern for other use cases where we would 
like to restrict range of certain parameters in the requests. What do you think?
   
   I like this idea, but it would require a KIP to implement.
   
   > @divijvaidya : I am concerned about the user facing aspect of this change. 
If I am a user and get this exception, what am I expected to do to resolve it? 
The documentation does not call out any limitation on max topics that I can 
create in one API call. How do I know what the limit is? The alternative 
approach I proposed above (topic policy limitation) solves this. We can 
document policies with the constraints and follow a similar pattern for other 
out of bound configuration/request params.
   
   I think the thing to keep in mind is that this PR doesn't make any request 
fail that wouldn't have already failed.
   
   > @mumrah : Colin can correct me if I'm mistaken, but I believe this patch 
is mainly about closing an existing edge case until we implement KIP-868 
(metadata transactions). Once we have transactions in the controller, we can 
allow arbitrarily large batches of records.
   
   Yes, that's correct.
   
   > @mumrah : I also wondered if we could solve this in ControllerResult 
rather than in each control manager separately.
   
   I think the issue is that people can request truly gigantic, 
garbage-collector killing lists of records to be constructed. You want to cut 
them off before they build the giant list, not after.
   
   > @mumrah : Will we remove this logic once transactions are implemented?
   
   I think we'll need some kind of limit even with metadata transactions in 
place. But it will be a limit not set by the implementation, but by our policy.
   
   > @edoardocomar : I had just done an alternative smaller PR for the same 
issue
   
   Thank you, Edoardo, I guess we were thinking along the same lines. One thing 
to keep in mind is that the problem is bigger than just CreateTopics. Any 
operation can be too big and cause implementation problems.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

2023-05-26 Thread via GitHub


cmccabe commented on code in PR #13742:
URL: https://github.com/apache/kafka/pull/13742#discussion_r1207123242


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -457,9 +466,13 @@ private Throwable handleEventException(String name,
 long endProcessingTime = time.nanoseconds();
 long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
 long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
-if (exception instanceof ApiException) {
+if ((exception instanceof ApiException) ||
+(exception instanceof BoundedListTooLongException)) {
 log.info("{}: failed with {} in {} us. Reason: {}", name,
 exception.getClass().getSimpleName(), deltaUs, 
exception.getMessage());
+if (exception instanceof BoundedListTooLongException) {
+exception = new UnknownServerException(exception.getMessage());

Review Comment:
   yeah that's fair.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

2023-05-26 Thread via GitHub


cmccabe commented on code in PR #13742:
URL: https://github.com/apache/kafka/pull/13742#discussion_r1207117766


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -160,6 +161,14 @@
  * the controller can fully initialize.
  */
 public final class QuorumController implements Controller {
+/**
+ * The maximum records any user-initiated operation is allowed to generate.
+ */
+final static int MAX_RECORDS_PER_USER_OP = 1;

Review Comment:
   It was chosen to be the same as MAX_RECORDS_PER_BATCH which is also 1. I 
will make this clearer by initializing them to the same values.



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -160,6 +161,14 @@
  * the controller can fully initialize.
  */
 public final class QuorumController implements Controller {
+/**
+ * The maximum records any user-initiated operation is allowed to generate.
+ */
+final static int MAX_RECORDS_PER_USER_OP = 1;

Review Comment:
   It was chosen to be the same as MAX_RECORDS_PER_BATCH which is also 1. I 
will make this clearer by initializing one to the other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

2023-05-26 Thread via GitHub


cmccabe commented on code in PR #13742:
URL: https://github.com/apache/kafka/pull/13742#discussion_r1207117766


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -160,6 +161,14 @@
  * the controller can fully initialize.
  */
 public final class QuorumController implements Controller {
+/**
+ * The maximum records any user-initiated operation is allowed to generate.
+ */
+final static int MAX_RECORDS_PER_USER_OP = 1;

Review Comment:
   It was chosen to be the same as MAX_RECORDS_PER_BATCH which is also 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration

2023-05-26 Thread via GitHub


mumrah commented on code in PR #13767:
URL: https://github.com/apache/kafka/pull/13767#discussion_r1207096649


##
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##
@@ -145,44 +144,47 @@ class ZkMigrationClient(
 topicClient.iterateTopics(
   util.EnumSet.allOf(classOf[TopicVisitorInterest]),
   new TopicVisitor() {
-  override def visitTopic(topicName: String, topicId: Uuid, assignments: 
util.Map[Integer, util.List[Integer]]): Unit = {

Review Comment:
   Just fixing the indentation here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #13767: KAFKA-15004: Fix configuration dual-write during migration

2023-05-26 Thread via GitHub


mumrah opened a new pull request, #13767:
URL: https://github.com/apache/kafka/pull/13767

   This PR builds on top of #13736.
   
   Fixes the following:
   
   * Topic configs are not sycned while handling snapshot.
   * New broker/topic configs in KRaft that did not exist in ZK will not be 
sync'd to ZK.
   * The sensitive configs are not encoded while writing them to Zookeeper.
   * Handle topic configs in ConfigMigrationClient and 
KRaftMigrationZkWriter#handleConfigsSnapshot
   
   Added tests to ensure we no longer have the above mentioned issues.
   
   Co-Authored-By: Akhilesh C 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14996) The KRaft controller should properly handle overly large user operations

2023-05-26 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726688#comment-17726688
 ] 

Edoardo Comar commented on KAFKA-14996:
---

Opened a PR to allow responding gracefully with INVALID_PARTITION error to the 
clients

https://github.com/apache/kafka/pull/13766

> The KRaft controller should properly handle overly large user operations
> 
>
> Key: KAFKA-14996
> URL: https://issues.apache.org/jira/browse/KAFKA-14996
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.5.0
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Blocker
>
> If an attempt is made to create a topic with
> num partitions >= QuorumController.MAX_RECORDS_PER_BATCH  (1)
> the client receives an UnknownServerException - it could rather receive a 
> better error.
> The controller logs
> {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed 
> with unknown server exception IllegalStateException at epoch 2 in 21956 us.  
> Renouncing leadership and reverting to the last committed offset 174. 
> (org.apache.kafka.controller.QuorumController)}}
> {{java.lang.IllegalStateException: Attempted to atomically commit 10001 
> records, but maxRecordsPerBatch is 1}}
> {{    at 
> org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}}
> {{    at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}}
> {{    at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}}
> {{    at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}}
> {{    at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}}
> {{    at java.base/java.lang.Thread.run(Thread.java:829)}}
> {{[}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-26 Thread via GitHub


erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1564684675

   > @philipnee I will try to review it this week. Thanks!
   
   Hi @dajac, did you already get the chance to look at this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] edoardocomar opened a new pull request, #13766: KAFKA-14996: Limit partition count in Create Topic and Create Partitions

2023-05-26 Thread via GitHub


edoardocomar opened a new pull request, #13766:
URL: https://github.com/apache/kafka/pull/13766

   Limit partition count in Create Topic and Create Partitions
   to avoid exceeding the QuorumController's MAX_RECORDS_PER_BATCH, 
   and respond gracefully to the client.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-26 Thread via GitHub


yashmayya commented on PR #13465:
URL: https://github.com/apache/kafka/pull/13465#issuecomment-1564677170

   Thanks Chris! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on a diff in pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-05-26 Thread via GitHub


splett2 commented on code in PR #13765:
URL: https://github.com/apache/kafka/pull/13765#discussion_r1207073563


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -1087,12 +1087,14 @@ class Partition(val topicPartition: TopicPartition,
 // avoid unnecessary collection generation
 val leaderLogEndOffset = leaderLog.logEndOffsetMetadata
 var newHighWatermark = leaderLogEndOffset
-remoteReplicasMap.values.foreach { replica =>
+remoteReplicasMap.foreachEntry { (replicaId, replica) =>

Review Comment:
   Should we have a test in `PartitionTest` to assert that the HWM is 
incremented when there is a replica that is fenced but caught up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13334: MINOR: Move plugin path parsing from DelegatingClassLoader to PluginUtils

2023-05-26 Thread via GitHub


C0urante merged PR #13334:
URL: https://github.com/apache/kafka/pull/13334


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-05-26 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1207028405


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+// Send concurrent generateProducerId requests. Ensure that the generated 
producer id is unique.
+// For each block (total 3 blocks), only "idBlockLen" number of requests 
should go through.
+// All other requests should fail immediately.
+
+val numThreads = 5
+val latch = new CountDownLatch(idBlockLen * 3)
 val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-IntStream.range(0, idBlockLen * 3).forEach { i =>
-  assertEquals(i, manager.generateProducerId())
+val pidMap = mutable.Map[Long, Int]()
+val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+for ( _ <- 0 until numThreads) {
+  requestHandlerThreadPool.submit(() => {
+while(latch.getCount > 0) {
+  val result = manager.generateProducerId()
+  result match {
+case Success(pid) =>
+  pidMap synchronized {
+if (latch.getCount != 0) {
+  val counter = pidMap.getOrElse(pid, 0)
+  pidMap += pid -> (counter + 1)
+  latch.countDown()
+}
+  }
+
+case Failure(exception) =>
+  assertEquals(classOf[CoordinatorLoadInProgressException], 
exception.getClass)
+  }
+  Thread.sleep(100)
+}
+  }, 0)
+}
+assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   I would keep it a bit higher so that it does not become flaky. Have you run 
it a few times on your own to make sure it is not flaky already?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13334: MINOR: Move plugin path parsing from DelegatingClassLoader to PluginUtils

2023-05-26 Thread via GitHub


C0urante commented on code in PR #13334:
URL: https://github.com/apache/kafka/pull/13334#discussion_r1207028289


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##
@@ -188,11 +191,34 @@ public static boolean isClassFile(Path path) {
 return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
 }
 
-public static List pluginLocations(Path topPath) throws IOException {
+public static List pluginLocations(String pluginPath) {
+if (pluginPath == null) {
+return Collections.emptyList();
+}
+String[] pluginPathElements = 
COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1);
+List pluginLocations = new ArrayList<>();
+for (String path : pluginPathElements) {
+try {
+Path pluginPathElement = Paths.get(path).toAbsolutePath();
+// Currently 'plugin.paths' property is a list of top-level 
directories
+// containing plugins
+if (Files.isDirectory(pluginPathElement)) {
+pluginLocations.addAll(pluginLocations(pluginPathElement));
+} else if (isArchive(pluginPathElement)) {
+pluginLocations.add(pluginPathElement);
+}

Review Comment:
    Thanks, LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13356: KAFKA-14789: Prevent mis-attributing classpath plugins, allow discovery of classpath RestExtension and ConfigProvider

2023-05-26 Thread via GitHub


C0urante merged PR #13356:
URL: https://github.com/apache/kafka/pull/13356


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller

2023-05-26 Thread via GitHub


rondagostino commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1207013139


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -223,6 +283,21 @@ public BrokerHeartbeatState next() {
 }
 }
 
+/**
+ * The maximum number of timed out heartbeats to count.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000;
+
+/**
+ * The time period over which to track timed out heartbeats.
+ */
+static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = 
TimeUnit.MINUTES.toNanos(5);
+
+/**
+ * The number of heartbeats to notice missing before we go into overload.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3;
+

Review Comment:
   Thinking about this some more, I believe the key requirement is to 
accurately understand when we do not have a correct view of the cluster.  This 
allows us to handle  2 important cases: not fencing a broker if its session 
times out but we think we could have missed enough heartbeats to make the 
decision to fence the wrong decision; and fencing a broker if its session times 
out whenever we think we have accurate information.  I think I addressed the 
second part (detect and fence a crashed broker as quick as possible) in the 
comment above: I believe we have accurate information if we see a contiguous 
series of N successfully-processed heartbeats with no intervening timed-out 
heartbeats where N is perhaps the cluster size.  For the first part (don't 
fence if we think the decision to do so could be wrong) assume the broker 
session is 18 seconds and the heartbeat interval is 2 seconds.  That means we 
would need to miss 9 heartbeats for a broker in order to incorrectly fence it.  
Maybe 
 we keep track of the number of contiguous successful heartbeats with no 
intervening misses (which, if we aren't missing any would always be a very high 
number).  But then as soon as we miss one we increment the missed count and 
reset the contiguous count to 0.  When we successfully process a heartbeat we 
increment the contiguous count and, if it reaches the necessary threshold N 
(which is on the order of the cluster size) we reset the missed count to 0.  We 
can fence brokers only while the missed count is less than the session/interval 
ratio (i.e. 18/2 = 9 by default).
   
   We can tweak this to be a bit more conservative.  Maybe we need N (the 
number of contiguous heartbeats seen to assure us we have good visibility) to 
be 1.5 or 2 times the broker count instead of just the broker count.  Maybe the 
missed count only has to exceed half the session/interval ratio (so only 
missing 5 heartbeats without seeing N successfully-processed ones in a row 
instead of 9 by default) to prevent fencing.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller

2023-05-26 Thread via GitHub


rondagostino commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1207013139


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -223,6 +283,21 @@ public BrokerHeartbeatState next() {
 }
 }
 
+/**
+ * The maximum number of timed out heartbeats to count.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000;
+
+/**
+ * The time period over which to track timed out heartbeats.
+ */
+static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = 
TimeUnit.MINUTES.toNanos(5);
+
+/**
+ * The number of heartbeats to notice missing before we go into overload.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3;
+

Review Comment:
   Thinking about this some more, I believe the key requirement is to 
accurately understand when we do not have a correct view of the cluster.  This 
allows us to handle  2 important cases: not fencing a broker if its session 
times out but we think we could have missed enough heartbeats to make the 
decision to fence the wrong decision; and fencing a broker if its session times 
out whenever we think we have accurate information.  I think I addressed the 
second part (detect and fence a crashed broker as quick as possible) in the 
comment above: I believe we have accurate information if we see a contiguous 
series of N successfully-processed heartbeats with no intervening timed-out 
heartbeats where N is perhaps the cluster size.  For the first part (don't 
fence if we think the decision to do so could be wrong) assume the broker 
session is 18 seconds and the heartbeat interval is 2 seconds.  That means we 
would need to miss 9 heartbeats for a broker in order to incorrectly fence it.  
Maybe 
 we keep track of the last time we had enough contiguous successful heartbeats 
(which, if we aren't missing any would always be very recent).  But then as 
soon as we miss one we increment the missed count and reset the contiguous 
count to 0.  When we successfully process a heartbeat we increment the 
contiguous count and, if it reaches the necessary threshold N (which is on the 
order of the cluster size) we reset the missed count to 0.  We can fence 
brokers only while the missed count is less than the session/interval ratio 
(i.e. 18/2 = 9 by default).
   
   We can tweak this to be a bit more conservative.  Maybe we need N (the 
number of contiguous heartbeats seen to assure us we have good visibility) to 
be 1.5 or 2 times the broker count instead of just the broker count.  Maybe the 
missed count only has to exceed half the session/interval ratio (so only 
missing 5 heartbeats without seeing N successfully-processed ones in a row 
instead of 9 by default) to prevent fencing.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller

2023-05-26 Thread via GitHub


rondagostino commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1207013139


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -223,6 +283,21 @@ public BrokerHeartbeatState next() {
 }
 }
 
+/**
+ * The maximum number of timed out heartbeats to count.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000;
+
+/**
+ * The time period over which to track timed out heartbeats.
+ */
+static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = 
TimeUnit.MINUTES.toNanos(5);
+
+/**
+ * The number of heartbeats to notice missing before we go into overload.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3;
+

Review Comment:
   Thinking about this some more, I believe the key requirement is to 
accurately understand when we do not have a correct view of the cluster.  This 
allows us to handle  2 important cases: not fencing a broker if its session 
times out but we think we could have missed enough heartbeats to make the 
decision to fence the wrong decision; and fencing a broker if its session times 
out whenever we think we have accurate information.  I think I addressed the 
second part (detect and fence a crashed broker as quick as possible) in the 
comment above: I believe we have accurate information if we see a contiguous 
series of N successfully-processed heartbeats with no intervening timed-out 
heartbeats where N is perhaps the cluster size.  For the first part (don't 
fence if we think the decision to do so could be wrong) assume the broker 
session is 18 seconds and the heartbeat interval is 2 seconds.  That means we 
would need to miss 9 heartbeats for a broker in order to incorrectly fence it.  
Maybe 
 we keep track of the last time we had enough contiguous successful heartbeats 
(which, if we aren't missing any would always be very recent).  But then as 
soon as we miss one we increment the missed count and reset the contiguous 
count to 0.  When we successfully process a heartbeat we increment the 
contiguous count and, if it reaches the necessary threshold N (which is on the 
order of the cluster size) we reset the missed count to 0.  We can fence 
brokers only while the missed count is less than the session/interval ratio 
(i.e. 18/2 = 9 by default).
   
   We can tweak this to be a bit more conservative.  Maybe we need N (the 
number of contiguous heartbeats seen to assure us we have good visibility) to 
be 1.5 or 2 times the broker count instead of just the broker count.  Maybe the 
missed count only has to exceed half the session/interval ratio (so only 
missing 5 heartbeats without seeing N successfully-processed ones in a row 
would prevent fencing instead of 9 by default).
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-26 Thread via GitHub


C0urante merged PR #13465:
URL: https://github.com/apache/kafka/pull/13465


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller

2023-05-26 Thread via GitHub


rondagostino commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1207013139


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -223,6 +283,21 @@ public BrokerHeartbeatState next() {
 }
 }
 
+/**
+ * The maximum number of timed out heartbeats to count.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000;
+
+/**
+ * The time period over which to track timed out heartbeats.
+ */
+static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = 
TimeUnit.MINUTES.toNanos(5);
+
+/**
+ * The number of heartbeats to notice missing before we go into overload.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3;
+

Review Comment:
   Thinking about this some more, I believe the key requirement is to 
accurately understand when we do not have a correct view of the cluster.  This 
allows us to handle  2 important cases: not fencing a broker if its session 
times out but we think we could have missed enough heartbeats to make the 
decision to fence the wrong decision; and fencing a broker if its session times 
out whenever we think we have accurate information.  I think I addressed the 
second part (detect and fence a crashed broker as quick as possible) in the 
comment above: I believe we have accurate information if we see a contiguous 
series of N successfully-processed heartbeats with no intervening timed-out 
heartbeats where N is perhaps the cluster size.  For the first part (don't 
fence if we think the decision to do so could be wrong) assume the broker 
session is 18 seconds and the heartbeat interval is 2 seconds.  That means we 
would need to miss 9 heartbeats for a broker in order to incorrectly fence it.  
Maybe 
 we keep track of the last time we had enough contiguous successful heartbeats 
(which, if we aren't missing any would always be very recent).  But then as 
soon as we miss one we increment the missed count and reset the contiguous 
count to 0.  When we successfully process a heartbeat we increment the 
contiguous count and, if it reaches the necessary threshold N (which is on the 
order of the cluster size) we reset the missed count to 0.  We can fence 
brokers only when the missed count approaches the session/interval ratio (i.e. 
18/2 = 9 by default).
   
   We can tweak this to be a bit more conservative.  Maybe we need N (the 
number of contiguous heartbeats seen to assure us we have good visibility) to 
be 1.5 or 2 times the broker count instead of just the broker count.  Maybe the 
missed count only has to exceed half the session/interval ratio (so only 
missing 5 heartbeats without seeing N successfully-processed ones in a row 
would prevent fencing instead of 9 by default).
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1206991344


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
* The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
*/
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
 val group = groupMetadataCache.get(groupId)
 if (group == null) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
   group.inLock {
 if (group.is(Dead)) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
-  val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-  topicPartitions.map { topicPartition =>
-if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-  topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+  def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
 } else {
-  val partitionData = group.offset(topicPartition) match {
+  group.offset(topicIdPartition) match {
 case None =>
   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.NONE)
 case Some(offsetAndMetadata) =>
   new PartitionData(offsetAndMetadata.offset,
 offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
   }
-  topicPartition -> partitionData
 }
-  }.toMap
+  }
+
+  topicIdPartitionsOpt match {
+case Some(topicIdPartitions) =>
+  topicIdPartitions.map { topicIdPartition =>
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  }.toMap
+
+case None =>
+  val topicIds = replicaManager.metadataCache.topicNamesToIds()
+  group.allOffsets.keySet.map { topicPartition =>
+Option(topicIds.get(topicPartition.topic())) match {
+  case Some(topicId) =>
+val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  case None =>
+val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+}
+  }.toMap

Review Comment:
   did we decide that topicIDPartition when the request is old would just have 
a 0 id? Could you give a brief outline for old vs new request versions and how 
they are handled (ie representation in memory when handling + what we return in 
the response for happy path and error cases)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-05-26 Thread via GitHub


C0urante commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1206885931


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -695,9 +705,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {
+log.debug("Not assigning offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());
+continue;
+}
+long pos;
+try {
+pos = consumer.position(tp);
+} catch (TimeoutException e) {
+log.error("TimeoutException occurred when fetching 
position for topic partition {}. " +
+"Checking if the topic {} exists", tp, tp.topic());
+Map topic = 
topicAdmin.describeTopics(tp.topic());

Review Comment:
   This adds new ACL requirements for sink connectors' admin clients, which is 
a breaking change and cannot be done until the next major release.
   
   We also need to handle generic exceptions thrown during this call. Probably 
safest to assume that the topic exists if we fail to describe it.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {

Review Comment:
   We should not be relying on undocumented behavior like this without 
confirmation from someone familiar with the clients library that it's 
intentional, or at the very least, that it won't change in the future without a 
KIP.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1367,9 +1367,14 @@ public WorkerTask doBuild(Task task,
 connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK);
 KafkaConsumer consumer = new 
KafkaConsumer<>(consumerProps);
 
+Map adminOverrides = adminConfigs(id.connector(), 
"connector-worker-adminclient-" + id.connector(),

Review Comment:
   We already construct an admin client for sink connectors [if they use a DLQ 
topic](https://github.com/apache/kafka/blob/6d72c26731fe69955127a90e3d43f6d9eb41e2d3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L947-L951).
   
   We should create at most one admin client per sink task, which means 
probably refactoring the `sinkTaskReporters` method to accept an admin client 
if we're going to be creating one unconditionally here.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {

Review Comment:
   Also, this entire block relies on racy logic to detect deleted topics that 
may fail if a topic is deleted after this check takes place. Again, we should 
reach out to someone familiar with the clients library. It's unlikely that 
Kafka Connect is the only consumer application that is impacted by this 
scenario and it's better to fix this at the clients level instead of 
implementing Connect-specific workarounds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

2023-05-26 Thread via GitHub


ijuma commented on code in PR #13742:
URL: https://github.com/apache/kafka/pull/13742#discussion_r1206985698


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -457,9 +466,13 @@ private Throwable handleEventException(String name,
 long endProcessingTime = time.nanoseconds();
 long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
 long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
-if (exception instanceof ApiException) {
+if ((exception instanceof ApiException) ||
+(exception instanceof BoundedListTooLongException)) {
 log.info("{}: failed with {} in {} us. Reason: {}", name,
 exception.getClass().getSimpleName(), deltaUs, 
exception.getMessage());
+if (exception instanceof BoundedListTooLongException) {
+exception = new UnknownServerException(exception.getMessage());

Review Comment:
   `UnknownServerException` is pretty uninformative. Can we use 
`PolicyViolationException` with a string indicating the specifics? Not perfect, 
but the custom string sent back to the user can clarify what's going on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

2023-05-26 Thread via GitHub


ijuma commented on code in PR #13742:
URL: https://github.com/apache/kafka/pull/13742#discussion_r1206985698


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -457,9 +466,13 @@ private Throwable handleEventException(String name,
 long endProcessingTime = time.nanoseconds();
 long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
 long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
-if (exception instanceof ApiException) {
+if ((exception instanceof ApiException) ||
+(exception instanceof BoundedListTooLongException)) {
 log.info("{}: failed with {} in {} us. Reason: {}", name,
 exception.getClass().getSimpleName(), deltaUs, 
exception.getMessage());
+if (exception instanceof BoundedListTooLongException) {
+exception = new UnknownServerException(exception.getMessage());

Review Comment:
   This is pretty uninformative. Can we use `PolicyViolationException` with a 
string indicating the specifics? Not perfect, but the ability to include a 
custom string is super helpful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13611: MINOR: remove unused variable from QuorumMetaLogListener#handleCommit method

2023-05-26 Thread via GitHub


machi1990 commented on PR #13611:
URL: https://github.com/apache/kafka/pull/13611#issuecomment-1564562097

   Thanks @jsancio for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #13611: MINOR: remove unused variable from QuorumMetaLogListener#handleCommit method

2023-05-26 Thread via GitHub


jsancio merged PR #13611:
URL: https://github.com/apache/kafka/pull/13611


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

2023-05-26 Thread via GitHub


mumrah commented on PR #13742:
URL: https://github.com/apache/kafka/pull/13742#issuecomment-1564483362

   @divijvaidya Colin can correct me if I'm mistaken, but I believe this patch 
is mainly about closing an existing edge case until we implement KIP-868 
(metadata transactions). Once we have transactions in the controller, we can 
allow arbitrarily large batches of records.
   
   > I am concerned about the user facing aspect of this change. If I am a user 
and get this exception, what am I expected to do to resolve it?
   
   Right now, if you create a topic with more than ~1 partitions, you'll 
get a server error anyways. The controller fails to commit the batch, throws 
and exception, and the renounces leadership. 
   
   Here's what happens on the controller:
   ```
   [2023-05-26 10:24:28,308] DEBUG [QuorumController id=1] Got exception while 
running createTopics(1813420413). Invoking handleException. 
(org.apache.kafka.queue.KafkaEventQueue)
   java.lang.IllegalStateException: Attempted to atomically commit 20001 
records, but maxRecordsPerBatch is 1
at 
org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)
at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.lang.Thread.run(Thread.java:750)
   [2023-05-26 10:24:28,314] INFO [RaftManager id=1] Received user request to 
resign from the current epoch 3 (org.apache.kafka.raft.KafkaRaftClient)
   [2023-05-26 10:24:28,323] INFO [RaftManager id=1] Failed to handle fetch 
from 2 at 142 due to NOT_LEADER_OR_FOLLOWER 
(org.apache.kafka.raft.KafkaRaftClient)
   ```
   
   And the client sees:
   ```
   [2023-05-26 10:24:28,351] ERROR 
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request.
(kafka.admin.TopicCommand$)
   ```
   
   So, really this patch isn't changing anything from the client's perspective. 
It's just prevent the controller from renouncing (which is the real problem).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer

2023-05-26 Thread via GitHub


vvcephei merged PR #13455:
URL: https://github.com/apache/kafka/pull/13455


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-26 Thread via GitHub


machi1990 commented on PR #13664:
URL: https://github.com/apache/kafka/pull/13664#issuecomment-1564402596

   Thanks @philipnee  @vvcephei for the review and merge!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-26 Thread via GitHub


vvcephei merged PR #13664:
URL: https://github.com/apache/kafka/pull/13664


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-05-26 Thread via GitHub


Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1206778975


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
* The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
*/
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
 val group = groupMetadataCache.get(groupId)
 if (group == null) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
   group.inLock {
 if (group.is(Dead)) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
-  val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-  topicPartitions.map { topicPartition =>
-if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-  topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+  def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
 } else {
-  val partitionData = group.offset(topicPartition) match {
+  group.offset(topicIdPartition) match {
 case None =>
   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.NONE)
 case Some(offsetAndMetadata) =>
   new PartitionData(offsetAndMetadata.offset,
 offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
   }
-  topicPartition -> partitionData
 }
-  }.toMap
+  }
+
+  topicIdPartitionsOpt match {
+case Some(topicIdPartitions) =>
+  topicIdPartitions.map { topicIdPartition =>
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  }.toMap
+
+case None =>
+  val topicIds = replicaManager.metadataCache.topicNamesToIds()
+  group.allOffsets.keySet.map { topicPartition =>
+Option(topicIds.get(topicPartition.topic())) match {
+  case Some(topicId) =>
+val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  case None =>
+val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+}
+  }.toMap

Review Comment:
   Sure that's fine! I will go ahead and start with the approach discussed last 
time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13764: KAFKA-14691; [1/N] Add new fields to OffsetFetchRequest and OffsetFetchResponse

2023-05-26 Thread via GitHub


dajac commented on PR #13764:
URL: https://github.com/apache/kafka/pull/13764#issuecomment-1564350280

   I have a few comments/questions:
   * I am not really comfortable with merging this without the server side 
implementation. @clolov Is there a strong reason to not do them together?
   * I agree with @Hangleton that it may be better to start with adding the 
TopicId only. This is complicated enough on its own. We can the other fields 
afterwards.
   * I agree with @jolshan that we should set `"latestVersionUnstable": true` 
while in development.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-05-26 Thread via GitHub


dajac commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1206732981


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
* The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
*/
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
 val group = groupMetadataCache.get(groupId)
 if (group == null) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
   group.inLock {
 if (group.is(Dead)) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
-  val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-  topicPartitions.map { topicPartition =>
-if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-  topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+  def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
 } else {
-  val partitionData = group.offset(topicPartition) match {
+  group.offset(topicIdPartition) match {
 case None =>
   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.NONE)
 case Some(offsetAndMetadata) =>
   new PartitionData(offsetAndMetadata.offset,
 offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
   }
-  topicPartition -> partitionData
 }
-  }.toMap
+  }
+
+  topicIdPartitionsOpt match {
+case Some(topicIdPartitions) =>
+  topicIdPartitions.map { topicIdPartition =>
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  }.toMap
+
+case None =>
+  val topicIds = replicaManager.metadataCache.topicNamesToIds()
+  group.allOffsets.keySet.map { topicPartition =>
+Option(topicIds.get(topicPartition.topic())) match {
+  case Some(topicId) =>
+val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  case None =>
+val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+}
+  }.toMap

Review Comment:
   I don't recall the details now. I have to get back to it...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-05-26 Thread via GitHub


Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1206702136


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
* The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
*/
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
 val group = groupMetadataCache.get(groupId)
 if (group == null) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
   group.inLock {
 if (group.is(Dead)) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
-  val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-  topicPartitions.map { topicPartition =>
-if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-  topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+  def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
 } else {
-  val partitionData = group.offset(topicPartition) match {
+  group.offset(topicIdPartition) match {
 case None =>
   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.NONE)
 case Some(offsetAndMetadata) =>
   new PartitionData(offsetAndMetadata.offset,
 offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
   }
-  topicPartition -> partitionData
 }
-  }.toMap
+  }
+
+  topicIdPartitionsOpt match {
+case Some(topicIdPartitions) =>
+  topicIdPartitions.map { topicIdPartition =>
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  }.toMap
+
+case None =>
+  val topicIds = replicaManager.metadataCache.topicNamesToIds()
+  group.allOffsets.keySet.map { topicPartition =>
+Option(topicIds.get(topicPartition.topic())) match {
+  case Some(topicId) =>
+val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  case None =>
+val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+}
+  }.toMap

Review Comment:
   Hi David and Justine, coming back to this, apologies for the delay. Are you 
comfortable of using `TopicIdPartition` as suggested above, or just keep 
`TopicPartition` and do the resolution to `TopicIdPartition` in `KafkaApis`? 
What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

2023-05-26 Thread via GitHub


Hangleton commented on PR #13558:
URL: https://github.com/apache/kafka/pull/13558#issuecomment-1564157619

   Hi, Igor, thanks for the review. I added the changes you reminded me about 
above. I am going through an additional test runs for this integration test to 
ensure there is no intermittent failure. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13748: [BUGFIX] Bugfixed in KAFKA-8713, but it doesn't work properly.

2023-05-26 Thread via GitHub


mimaison commented on PR #13748:
URL: https://github.com/apache/kafka/pull/13748#issuecomment-1564113409

   Good catch! 
   Yes it would be good to have this in 3.5. @krespo can you update your PR 
with the suggestions from @gharris1727 ? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14953) Add metrics for tiered storage

2023-05-26 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726550#comment-17726550
 ] 

Luke Chen commented on KAFKA-14953:
---

Sounds good! Thank you!

> Add metrics for tiered storage
> --
>
> Key: KAFKA-14953
> URL: https://issues.apache.org/jira/browse/KAFKA-14953
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Abhijeet Kumar
>Priority: Minor
>
> Not just for expired fetch. We also need to add all the metrics described in 
> KIP-405
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-NewMetrics
>  
> ref: [https://github.com/apache/kafka/pull/13535#discussion_r1180286031] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-26 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1206322259


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,876 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a response 

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-26 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1206317753


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerGroupTest {
+
+private ConsumerGroup createConsumerGroup(String groupId) {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+return new ConsumerGroup(snapshotRegistry, groupId);
+}
+
+@Test
+public void testGetOrCreateMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+// Create a group.
+member = consumerGroup.getOrMaybeCreateMember("member-id", true);
+assertEquals("member-id", member.memberId());
+
+// Get that group back.
+member = consumerGroup.getOrMaybeCreateMember("member-id", false);
+assertEquals("member-id", member.memberId());
+
+assertThrows(UnknownMemberIdException.class, () ->
+consumerGroup.getOrMaybeCreateMember("does-not-exist", false));
+}
+
+@Test
+public void testUpdateMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+member = consumerGroup.getOrMaybeCreateMember("member", true);
+
+member = new ConsumerGroupMember.Builder(member)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.build();
+
+consumerGroup.updateMember(member);
+
+assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", 
false));
+}
+
+@Test
+public void testRemoveMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+consumerGroup.getOrMaybeCreateMember("member", true);
+assertTrue(consumerGroup.hasMember("member"));
+
+consumerGroup.removeMember("member");
+assertFalse(consumerGroup.hasMember("member"));
+
+}
+
+@Test
+public void testUpdatingMemberUpdatesPartitionEpoch() {
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+Uuid zarTopicId = Uuid.randomUuid();
+
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(barTopicId, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(zarTopicId, 7, 8, 9)))
+.build();
+
+consumerGroup.updateMember(member);
+
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2));
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3));
+assertEquals(10,