Re: [PR] KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test [kafka]

2024-03-13 Thread via GitHub


kamalcph commented on PR #15523:
URL: https://github.com/apache/kafka/pull/15523#issuecomment-1996675858

   Thanks for the review!
   
   > If the cleaner thread wasn't even started, are our really testing correct 
behaviour? For example, when we validate that cleaner shouldn't have deleted 
something, we would happily think that cleaner ran but didn't delete stuff 
whereas in reality, it may never have run at all. Should we instead add a wait 
loop in our setup for cache in the test to ensure that cleaner starts running?
   
   The RemoteIndexCache (RIC) and RemoteIndexCacheTest (RICT) are in different 
packages, so we cannot introduce a new package-private method in the RIC to 
overwrite it in RICT to ensure that the cleaner-thread started. Also, the 
ShutdownableThread is widely used by various module, so not changing the 
behavior of the thread.
   
   1. The ShutdownableThread source code is in Java, so thought to write the 
new unit tests in java.
   2. In this test, we don't expect any entries to expire. The test fails 2% of 
time which mean the remaining 98% of time, the cleaner-thread ran. In the 
assertion, we ensure that the cleaner-thread is closing correctly for both the 
cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-13 Thread via GitHub


artemlivshits commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1524022035


##
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##
@@ -109,23 +109,30 @@ object TransactionMarkerChannelManager {
 
 }
 
-class TxnMarkerQueue(@volatile var destination: Node) {
+class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
 
   // keep track of the requests per txn topic partition so we can easily clear 
the queue
   // during partition emigration
-  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[TxnIdAndMarkerEntry]]().asScala
+  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala
 
-  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[TxnIdAndMarkerEntry]] = {
+  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = {
 markersPerTxnTopicPartition.remove(partition)
   }
 
-  def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): 
Unit = {
-val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition,
-new LinkedBlockingQueue[TxnIdAndMarkerEntry]())
-queue.add(txnIdAndMarker)
+  def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: 
PendingCompleteTxnAndMarkerEntry): Unit = {
+val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition, {
+  info(s"Creating new marker queue for txn partition $txnTopicPartition to 
destination broker ${destination.id}")
+  new LinkedBlockingQueue[PendingCompleteTxnAndMarkerEntry]()
+})
+queue.add(pendingCompleteTxnAndMarker)
+
+if (markersPerTxnTopicPartition.get(txnTopicPartition).orNull != queue) {
+  // This could happen if the queue got removed concurrently.

Review Comment:
   As far as I can see, it shouldn't affect the user visible behavior.  It does 
create an interesting state when the queue is removed in 
removeMarkersForTxnTopicPartition -- we could have:
   1. [addMarkers] Retrieve queue.
   2. [removeMarkersForTxnTopicPartition] Remove queue.
   3. [removeMarkersForTxnTopicPartition] Iterate over queue, but not 
removeMarkersForTxn because queue is empty.
   4. [addMarkers] Add markers to the queue.
   
   Now we've effectively removed the markers while 
transactionsWithPendingMarkers has an entry.
   
   This state could last for a while if the removal happened on unload (and 
technically the txn id could expire or etc. so this state may stay indefinitely 
until broker restart), but as soon as real workflow happens on this txn id that 
sends out markers, the proper entry will be created and the actual 
functionality will work as expected.
   
   In other words, this race can lead to an orphan entry in 
transactionsWithPendingMarkers, but it doesn't affect anything (other than 
leaking a small amount of memory) until the markers are sent, and sending 
markers will fix it.



##
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala:
##
@@ -90,9 +90,10 @@ class TransactionMarkerRequestCompletionHandler(brokerId: 
Int,
   val writeTxnMarkerResponse = 
response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
 
   val responseErrors = writeTxnMarkerResponse.errorsByProducerId
-  for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {

Review Comment:
   I agree this code could benefit from some refactor, we should probably 
structure it so that instead of branching on wasDisconnected at the top, it 
should just iterate over the pending entries and check wasDisconnected in 
specific cases.  But I think it should be done separately, as this change is 
fairly mechanical, that simplifies the review of what it does. 



##
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##
@@ -109,23 +109,30 @@ object TransactionMarkerChannelManager {
 
 }
 
-class TxnMarkerQueue(@volatile var destination: Node) {
+class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
 
   // keep track of the requests per txn topic partition so we can easily clear 
the queue
   // during partition emigration
-  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[TxnIdAndMarkerEntry]]().asScala
+  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala
 
-  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[TxnIdAndMarkerEntry]] = {
+  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = {
 markersPerTxnTopicPartition.remove(partition)
   }
 
-  def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): 
Unit 

Re: [PR] MINOR: Kafka Streams docs fixes [kafka]

2024-03-13 Thread via GitHub


mjsax merged PR #15517:
URL: https://github.com/apache/kafka/pull/15517


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: simplify consumer logic [kafka]

2024-03-13 Thread via GitHub


mjsax merged PR #15519:
URL: https://github.com/apache/kafka/pull/15519


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15302: Stale value returned when using store.all() with key deletion [docs] [kafka]

2024-03-13 Thread via GitHub


mjsax commented on code in PR #15495:
URL: https://github.com/apache/kafka/pull/15495#discussion_r1524038413


##
docs/streams/developer-guide/memory-mgmt.html:
##
@@ -151,6 +151,10 @@
 Serdes.Long())
   .withCachingEnabled();
   Record caches are not supported for versioned state stores.
+  Caution: When using withCachingEnabled(),
+if you delete() a key from the Store while iterating(e.g., 
KeyValueIterator) through the keys,
+the value of the key that hasn't been flushed from the cache may be 
returned as a stale value.
+Therefore, when deleting, you must flush() the cache before the 
iterator.

Review Comment:
   I would rephrase this sentenc.
   ```
   To avoid reading stale data, you can `flush()` the store before creating the 
iterator. Note, that flushing too often can lead to performance degration if 
RocksDB is used, so we advice to avoid flushing manually in general.



##
docs/streams/developer-guide/memory-mgmt.html:
##
@@ -151,6 +151,10 @@
 Serdes.Long())
   .withCachingEnabled();
   Record caches are not supported for versioned state stores.
+  Caution: When using withCachingEnabled(),
+if you delete() a key from the Store while iterating(e.g., 
KeyValueIterator) through the keys,

Review Comment:
   Cannot remember from the ticket, but was this limited to deletes, or also 
updates?



##
docs/streams/developer-guide/memory-mgmt.html:
##
@@ -151,6 +151,10 @@
 Serdes.Long())
   .withCachingEnabled();
   Record caches are not supported for versioned state stores.
+  Caution: When using withCachingEnabled(),
+if you delete() a key from the Store while iterating(e.g., 
KeyValueIterator) through the keys,

Review Comment:
   ```suggestion
   if you delete() a key from the store while iterating (e.g., 
KeyValueIterator) through the keys,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix descriptions of raft-metrics in docs/ops.html [kafka]

2024-03-13 Thread via GitHub


github-actions[bot] commented on PR #14973:
URL: https://github.com/apache/kafka/pull/14973#issuecomment-1996325738

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix so that a partition is retained, if the another parititon on same… [kafka]

2024-03-13 Thread via GitHub


github-actions[bot] commented on PR #15019:
URL: https://github.com/apache/kafka/pull/15019#issuecomment-1996325711

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


ChrisAHolland commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1524147955


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


ChrisAHolland commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1524143322


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {

Review Comment:
   Sure, sounds good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


chia7712 commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1524140865


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {

Review Comment:
   > I like the idea of migrating newArrayBacked(int maxLength) to 
newLinkedListBacked(int maxLength), this would also help cut down on array list 
resizing time, as currently this constructor creates a default array list, 
which is size 10
   
   I did not suggest to use Linked list for current usage 🫢 
   
   My point was keeping the constructor can have flexibility of changing the 
list implementation in the future 
   
   We need to consider the cost of iteration before adopting the linked list.
   
   To simplify this PR and address the origin propose, changing the scope from 
public to private is good enough. WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


ChrisAHolland commented on PR #15507:
URL: https://github.com/apache/kafka/pull/15507#issuecomment-1996246082

   @chia7712 @jolshan Pushed an update based on discussion


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


ChrisAHolland commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1524100301


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {

Review Comment:
   I like the idea of migrating `newArrayBacked(int maxLength`) to 
`newLinkedListBacked(int maxLength)`, this would also help cut down on array 
list resizing time, as currently this constructor creates a default array list, 
which is size 10



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


chia7712 commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1524078277


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {

Review Comment:
   1. maybe `private` is enough. the another view is that we can have 
`newListBacked` for more flexible usage in the future, and the implementation 
is based on the constructor taking a passed list.
   
   ```java
   public static  BoundedList newListBacked(int maxLength) {
   return new BoundedList<>(maxLength, new LinkedList<>());
   }
   ```
   
   2. "ArrayList resizing operations don't seem to be a concern to people." -> 
caller can fix it by defining "initialCapacity" 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


ChrisAHolland commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1524075123


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {

Review Comment:
   I think so. It looks like the consensus is:
   1. Remove constructor that takes in a list.
   2. Keep both versions of `newArrayBacked`, as ArrayList resizing operations 
don't seem to be a concern to people.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


ChrisAHolland commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1524073993


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {
-return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity));
-}
-
-public BoundedList(int maxLength, List underlying) {
+public BoundedList(int maxLength) {
 if (maxLength <= 0) {
 throw new IllegalArgumentException("Invalid non-positive maxLength 
of " + maxLength);
 }
 this.maxLength = maxLength;
-if (underlying.size() > maxLength) {
-throw new BoundedListTooLongException("Cannot wrap list, because 
it is longer than " +
-"the maximum length " + maxLength);
-}
-this.underlying = underlying;
+this.underlying = new ArrayList<>(maxLength);

Review Comment:
   Got it, so we can keep the existing `newArrayBacked()` constructors and get 
rid of the constructor that takes in a list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1524070820


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {
-return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity));
-}
-
-public BoundedList(int maxLength, List underlying) {
+public BoundedList(int maxLength) {
 if (maxLength <= 0) {
 throw new IllegalArgumentException("Invalid non-positive maxLength 
of " + maxLength);
 }
 this.maxLength = maxLength;
-if (underlying.size() > maxLength) {
-throw new BoundedListTooLongException("Cannot wrap list, because 
it is longer than " +
-"the maximum length " + maxLength);
-}
-this.underlying = underlying;
+this.underlying = new ArrayList<>(maxLength);

Review Comment:
   Yeah, it seems like we want to make the constructor private and use the 
newArrayBacked methods to accomplish what we want. 
   
   I think we can all agree directly passing in the underlying arraylist is not 
what we want 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1524069898


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {

Review Comment:
   My understanding here is that we do want to know an expected size to start 
out with, but we shouldn't necessarily link it directly to an array we are 
passing in as the constructor does. Does that mean the deleted code here is 
actually the behavior we want?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-13 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1524067155


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,236 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
+  val mockTime: Time = new MockTime(1)
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-produceMessages()
+createTopicWithConfig(topicName, new Properties())
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
+  override def brokerTime(brokerId: Int): Time = mockTime
+
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch("gzip")
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testLatestOffset(quorum: String): Unit = {
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
-assertEquals(3, latestOffset.offset())
+  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+produceMessagesInSeparateBatch()
+verifyListOffsets()

Review Comment:
   It's because the fix about "returning the first offset when multi-records 
having the same maxTimestamp" for "non-compressed" records are in this PR: 
https://github.com/apache/kafka/pull/15476 . we should add the logAppendTime 
test there. :) cc @johnnychhsu



##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,236 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
+  val mockTime: Time = new MockTime(1)
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-produceMessages

Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-13 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1524067155


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,236 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
+  val mockTime: Time = new MockTime(1)
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-produceMessages()
+createTopicWithConfig(topicName, new Properties())
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
+  override def brokerTime(brokerId: Int): Time = mockTime
+
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch("gzip")
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testLatestOffset(quorum: String): Unit = {
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
-assertEquals(3, latestOffset.offset())
+  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+produceMessagesInSeparateBatch()
+verifyListOffsets()

Review Comment:
   It's because the fix about "returning the first offset when multi-records 
having the same maxTimestamp" for "non-compressed" records are in this PR: 
https://github.com/apache/kafka/pull/15476 . We should add tests there. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


chia7712 commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1524061026


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {
-return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity));
-}
-
-public BoundedList(int maxLength, List underlying) {
+public BoundedList(int maxLength) {
 if (maxLength <= 0) {
 throw new IllegalArgumentException("Invalid non-positive maxLength 
of " + maxLength);
 }
 this.maxLength = maxLength;
-if (underlying.size() > maxLength) {
-throw new BoundedListTooLongException("Cannot wrap list, because 
it is longer than " +
-"the maximum length " + maxLength);
-}
-this.underlying = underlying;
+this.underlying = new ArrayList<>(maxLength);

Review Comment:
   Changing the access scope is to fix the issue you described:
   
   >  This is problematic because the the constructor created the BoundedList 
using a reference to underlying, not a copy of it; therefore, a user could add 
elements to underlying instead of their newly instantiated BoundedList and 
force the BoundedList to be larger than its maxLength`.
   
   We should avoid it from using in code and so the `private` is good practice. 
All callers should use `newArrayBacked` to create `BoundedList`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


chia7712 commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1524056640


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {

Review Comment:
   > I'm not sure what you mean, could you please elaborate?
   
   sorry for unclear comment. 
   
   It seems to me "initialCapacity" is still useful if we "expect" the size of 
batch could be large. For example: create/delete a bunch of topics - that is a 
common usage. For such use cases, it would be better to set the initial 
capacity to avoid the resize problem you described.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-13 Thread via GitHub


mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1524044954


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+
+public class KStreamKStreamWindowCloseTest {
+
+private static final String LEFT = "left";
+private static final String RIGHT = "right";
+private final static Properties PROPS = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private static final Consumed CONSUMED = 
Consumed.with(Serdes.Integer(), Serdes.String());
+private static final JoinWindows WINDOW = 
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5));
+
+static List streams() {
+return Arrays.asList(
+innerJoin(),
+leftJoin(),
+outerJoin()
+);
+}
+
+private static Arguments innerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).join(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments leftJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).leftJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments outerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).outerJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
supplier = new MockApiProcessorSupplier<>();
+stream.process(supplier);
+return Arguments.of(builder.build(PROPS), supplier);
+}
+
+@ParameterizedTest
+@MethodSource("streams")
+public void recordsArrivingPostWindowCloseShouldBeDropped(
+final Topology topology,
+final MockApiProcessorSupplier supplier) {
+  

Re: [PR] MINOR; Missing minISR config should log a debug message [kafka]

2024-03-13 Thread via GitHub


CalvinConfluent commented on code in PR #15529:
URL: https://github.com/apache/kafka/pull/15529#discussion_r1523997343


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2189,7 +2189,7 @@ int getTopicEffectiveMinIsr(String topicName) {
 if (minIsrConfig != null) {
 currentMinIsr = Integer.parseInt(minIsrConfig);
 } else {
-log.warn("Can't find the min isr config for topic: " + topicName + 
" using default value " + defaultMinIsr);
+log.debug("Can't find the min isr config for topic: " + topicName 
+ " using default value " + defaultMinIsr);

Review Comment:
   Make sense, thanks! Removed the error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-13 Thread via GitHub


artemlivshits commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1523996339


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2185,9 +2179,144 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {
+// If the previous cursor points to the partition 0, the 
cursor will not be set as the first one
+// in the topic list should be the previous cursor topic.
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicDescription.name())
+
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+);
+}
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+// The topicDescription for the cursor topic of the current 
batch.
+TopicDescription nextTopicDescription = null;
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if (partiallyFinished

[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective

2024-03-13 Thread Apoorv Mittal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826885#comment-17826885
 ] 

Apoorv Mittal commented on KAFKA-16359:
---

[~ijuma] here is the fix [https://github.com/apache/kafka/pull/15532], the PR 
description explains why we require code from plugin extracted.

> kafka-clients-3.7.0.jar published to Maven Central is defective
> ---
>
> Key: KAFKA-16359
> URL: https://issues.apache.org/jira/browse/KAFKA-16359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Jeremy Norris
>Assignee: Apoorv Mittal
>Priority: Critical
>
> The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is 
> defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} 
> element:
> {code}
> Manifest-Version: 1.0
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
> .5.jar slf4j-api-1.7.36.jar
> {code}
> This bogus {{Class-Path}} element leads to compiler warnings for projects 
> that utilize it as a dependency:
> {code}
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": 
> no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> {code}
> Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published 
> without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or 
> a new release should be published that corrects this defect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16359: Corrected manifest file for kafka-clients [kafka]

2024-03-13 Thread via GitHub


apoorvmittal10 opened a new pull request, #15532:
URL: https://github.com/apache/kafka/pull/15532

   The issue [KAFKA-16359](https://issues.apache.org/jira/browse/KAFKA-16359) 
reported inclusion of `kafka-clients` runtime dependencies in MANIFEST.MF 
`Class-Path`.
   
   The root cause is the issue defined 
[here](https://github.com/johnrengelman/shadow/issues/324) with the usage of 
`shadow` plugin.
   
   Looking into the specifics of plugin and 
[documentation](https://imperceptiblethoughts.com/shadow/configuration/#configuring-the-runtime-classpath),
 specifies that any dependency marked as `shadow` will be treated as following 
by the shadow plugin:
   
   1. Adds the dependency as runtime dependency in resultant pom.xml - [code 
here](https://github.com/johnrengelman/shadow/blob/1d647471b9130d35b17771a50be382f6cb442d71/src/main/groovy/com/github/jengelman/gradle/plugins/shadow/ShadowExtension.groovy#L32)
   2. Adds the dependency as `Class-Path` in `MANIFEST.MF` as well - [code 
here](https://github.com/johnrengelman/shadow/blob/1d647471b9130d35b17771a50be382f6cb442d71/src/main/groovy/com/github/jengelman/gradle/plugins/shadow/ShadowJavaPlugin.groovy#L71)
   
   ### Resolution
   We do need the runtime dependencies available in the pom (1 above) but not 
on manifest (2 above). Also there is no clear way to separate the behaviour as 
both above tasks relies on `shadow` configuration. 
   
   To fix, I have defined another custom configuration named `shadowed` which 
is later used to populate the correct pom (the [code is similar to what shadow 
plugin 
does](https://github.com/johnrengelman/shadow/blob/1d647471b9130d35b17771a50be382f6cb442d71/src/main/groovy/com/github/jengelman/gradle/plugins/shadow/ShadowExtension.groovy#L32)
 to populate pom for runtime dependencies).
   
   Though this might seem like a workaround, but I think that's the only way to 
fix the issue. I have checked other SDKs which suffered with same issue and 
went with similar route to populate pom.
   
   ### Verification
   
   1. Exploded jar by `jar -xvf kafka-clientsjar` and compared pre and post 
manifest contents.
   ![Screenshot 2024-03-13 at 9 37 16 
PM](https://github.com/apache/kafka/assets/2861565/bffa07f1-f8a6-4c77-a4e0-3b38c0968fc7)
   
   2. Verified pre and post resultant pom.
   ![Screenshot 2024-03-13 at 9 39 08 
PM](https://github.com/apache/kafka/assets/2861565/edf7c936-eaab-4072-91dc-f0827cf1f5af)
   
   3. Verified pre and post jar tf output `jar -tf 
kafka-clients-3.8.0-SNAPSHOT.jar  > jar_tf_output`  
   ![Screenshot 2024-03-13 at 9 38 48 
PM](https://github.com/apache/kafka/assets/2861565/147a8570-0295-4edb-91a0-ddc7dab1e659)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-13 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1523971792


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+TopicDescription nextTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {

Review Comment:
   Correct, updated the comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-13 Thread via GitHub


jsancio commented on code in PR #15478:
URL: https://github.com/apache/kafka/pull/15478#discussion_r1523954492


##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -348,14 +357,15 @@ final class KafkaMetadataLog private (
   snapshotId.offset <= latestSnapshotId.offset &&
   log.maybeIncrementLogStartOffset(snapshotId.offset, 
LogStartOffsetIncrementReason.SnapshotGenerated) =>
 // Delete all segments that have a "last offset" less than the log 
start offset
-log.deleteOldSegments()
+val deletedSegments = log.deleteOldSegments()
 // Remove older snapshots from the snapshots cache
-(true, forgetSnapshotsBefore(snapshotId))
+val forgottenSnapshots = forgetSnapshotsBefore(snapshotId)
+(deletedSegments != 0 || forgottenSnapshots.nonEmpty, 
forgottenSnapshots)

Review Comment:
   The old code was probably correct in practice. I just wanted to make clearer 
that either one could be true for this function to return true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-13 Thread via GitHub


jsancio commented on code in PR #15478:
URL: https://github.com/apache/kafka/pull/15478#discussion_r1523951275


##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -404,21 +414,33 @@ final class KafkaMetadataLog private (
* all cases.
*
* For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
+   * The predicate returns a Some with the reason if the snapshot should be 
deleted and a None if the snapshot
+   * should not be deleted
*/
-  private def cleanSnapshots(predicate: OffsetAndEpoch => Boolean): Boolean = {
-if (snapshots.size < 2)
+  private def cleanSnapshots(predicate: OffsetAndEpoch => 
Option[SnapshotDeletionReason]): Boolean = {
+if (snapshots.size < 2) {
   return false
+}
 
 var didClean = false
 snapshots.keys.toSeq.sliding(2).foreach {
   case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
-if (predicate(snapshot) && deleteBeforeSnapshot(nextSnapshot)) {
-  didClean = true
-} else {
-  return didClean
+predicate(snapshot) match {
+  case Some(reason) =>
+if (deleteBeforeSnapshot(nextSnapshot, reason)) {

Review Comment:
   Yes. `&&` only evaluates the right argument if the left argument is `true`. 
Which is what we want because we don't want to delete the snapshot if the 
predicate is false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Make string from array [kafka]

2024-03-13 Thread via GitHub


jsancio merged PR #15526:
URL: https://github.com/apache/kafka/pull/15526


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1523937621


##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -557,6 +557,7 @@ class TransactionStateManager(brokerId: Int,
   loadingPartitions.remove(partitionAndLeaderEpoch)
 
   transactionsPendingForCompletion.foreach { txnTransitMetadata =>
+info(s"Sending txn markers for $txnTransitMetadata after loading 
partition $partitionId")

Review Comment:
   I appreciate this log as well, but I wonder if we could combine it into a 
single log message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1523936892


##
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala:
##
@@ -90,9 +90,10 @@ class TransactionMarkerRequestCompletionHandler(brokerId: 
Int,
   val writeTxnMarkerResponse = 
response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
 
   val responseErrors = writeTxnMarkerResponse.errorsByProducerId
-  for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {

Review Comment:
   again -- not related to the code changes, but there is a lot of duplicated 
code between the disconnected and the response cases. Most is the same for the 
state errors cases like coordinator_load_in_progress etc.
   
   I wonder if we could extract that out and put it above... 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1523931941


##
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala:
##
@@ -39,22 +39,23 @@ class TransactionMarkerRequestCompletionHandler(brokerId: 
Int,
 if (response.wasDisconnected) {
   trace(s"Cancelled request with header $requestHeader due to node 
${response.destination} being disconnected")
 
-  for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {
-val transactionalId = txnIdAndMarker.txnId
-val txnMarker = txnIdAndMarker.txnMarkerEntry
+  for (pendingCompleteTxnAndMarker <- 
pendingCompleteTxnAndMarkerEntries.asScala) {
+val pendingCompleteTxn = pendingCompleteTxnAndMarker.pendingCompleteTxn
+val transactionalId = pendingCompleteTxn.transactionalId
+val txnMarker = pendingCompleteTxnAndMarker.txnMarkerEntry
 
 txnStateManager.getTransactionState(transactionalId) match {
 
   case Left(Errors.NOT_COORDINATOR) =>
 info(s"I am no longer the coordinator for $transactionalId; cancel 
sending transaction markers $txnMarker to the brokers")
 
-txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)

Review Comment:
   unrelated to the changes here, but the first person ("I am") logging here is 
somewhat strange 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on PR #15511:
URL: https://github.com/apache/kafka/pull/15511#issuecomment-1995835817

   hey @lucasbru, I took another full pass, left a few comments. Thanks for the 
changes!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1523916766


##
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##
@@ -419,25 +432,34 @@ class TransactionMarkerChannelManager(
 
   def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {
 
markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach
 { queue =>
-  for (entry: TxnIdAndMarkerEntry <- queue.asScala)
-removeMarkersForTxnId(entry.txnId)
+  for (entry <- queue.asScala) {

Review Comment:
   Do we have an idea for how many inflight markers we may have at once? Just 
want to make sure this log message isn't too spammy. I wonder if we could log 
all the markers in a single log 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523858796


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -889,43 +914,36 @@ private void transitionToStale() {
  */
 void maybeReconcile() {
 if (targetAssignmentReconciled()) {
-log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
 "current assignment.");
 return;
 }
 if (reconciliationInProgress) {
-log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
 return;
 }
 
 // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
 // if some topic IDs are not resolvable.
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
 
-SortedSet ownedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
-ownedPartitions.addAll(subscriptions.assignedPartitions());
-
-// Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
-// being reconciled. Needed for interactions with the centralized 
subscription state that
-// does not support topic IDs yet, and for the callbacks.
-SortedSet assignedTopicPartitions = 
toTopicPartitionSet(assignedTopicIdPartitions);
-
-// Check same assignment. Based on topic names for now, until topic 
IDs are properly
-// supported in the centralized subscription state object. Note that 
this check is
-// required to make sure that reconciliation is not triggered if the 
assignment ready to
-// be reconciled is the same as the current one (even though the 
member may remain
-// in RECONCILING state if it has some unresolved assignments).
-boolean sameAssignmentReceived = 
assignedTopicPartitions.equals(ownedPartitions);
-
-if (sameAssignmentReceived) {
+if (resolvedAssignment.equals(currentAssignment)) {

Review Comment:
   well I was expecting we only need to trigger the callbacks if the assignment 
changed (could be to empty, but something needs to change), and that's not the 
case if the member ends up with t1-1 again, that it already owns. 
   
   By running a full reconciliation when the the resolved assignment is the 
same as the current but received later, we end up with a client reconciling the 
exact same assignment it already owns :S It would turn out noisy I expect, 
accounting for 2 rebalances in cases probably much more common, where a new 
topic assigned is temporarily not in metadata and then discovered: member owns 
[t1-1], receives assignment [t1-1, t2-1] with missing metadata for t2 (t2 
discovered shortly after). Wouldn't we generate 2 rebalances??? (a 1st one with 
no changes in assignment, a 2nd one with the added topic once discovered) when 
truly things only changed once)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


ChrisAHolland commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1523909237


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {

Review Comment:
   I'm not sure what you mean, could you please elaborate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


ChrisAHolland commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1523907637


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {
-return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity));
-}
-
-public BoundedList(int maxLength, List underlying) {
+public BoundedList(int maxLength) {
 if (maxLength <= 0) {
 throw new IllegalArgumentException("Invalid non-positive maxLength 
of " + maxLength);
 }
 this.maxLength = maxLength;
-if (underlying.size() > maxLength) {
-throw new BoundedListTooLongException("Cannot wrap list, because 
it is longer than " +
-"the maximum length " + maxLength);
-}
-this.underlying = underlying;
+this.underlying = new ArrayList<>(maxLength);

Review Comment:
   Sorry I am confused on what you want to change to `private`? Today, passing 
in the source list is only used in unit tests, not any source code. If it's 
only used for unit tests I think it should be removed so that tests interact 
with the BoundedList the same way the source code does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1523907297


##
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##
@@ -109,23 +109,30 @@ object TransactionMarkerChannelManager {
 
 }
 
-class TxnMarkerQueue(@volatile var destination: Node) {
+class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
 
   // keep track of the requests per txn topic partition so we can easily clear 
the queue
   // during partition emigration
-  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[TxnIdAndMarkerEntry]]().asScala
+  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala
 
-  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[TxnIdAndMarkerEntry]] = {
+  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = {
 markersPerTxnTopicPartition.remove(partition)
   }
 
-  def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): 
Unit = {
-val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition,
-new LinkedBlockingQueue[TxnIdAndMarkerEntry]())
-queue.add(txnIdAndMarker)
+  def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: 
PendingCompleteTxnAndMarkerEntry): Unit = {
+val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition, {
+  info(s"Creating new marker queue for txn partition $txnTopicPartition to 
destination broker ${destination.id}")

Review Comment:
   I noticed in the doc:
   
   ```
   `createValue` may be invoked more than once if multiple threads attempt to 
insert a key at the same time
```
   
   This seems ok, but just wanted to call it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523903844


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -1952,19 +1942,22 @@ private void 
assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm
 // Should reset epoch to leave the group and release the assignment 
(right away because
 // there is no onPartitionsLost callback defined)
 verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
-assertTrue(membershipManager.currentAssignment().isEmpty());
+assertTrue(membershipManager.currentAssignment().isNone());
 assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
 assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
membershipManager.memberEpoch());
 }
 
 @Test
 public void 
testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {

Review Comment:
   I cannot find any test for one of the 2 issues we're after with this PR: 
ensure that we are triggering the callbacks when joining and getting empty 
assignment. Could we add it? I expect it should be very similar to this one, 
but just providing a `CounterConsumerRebalanceListener` and asserting that it 
called the `onPartitionsAssigned`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523885245


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
-membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-data.setInstanceId(groupInstanceId);
-sentFields.instanceId = groupInstanceId;
-}
-});
+// InstanceId - always send when leaving the group as a static 
member
+membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining
+if (membershipManager.memberEpoch() == 0) {
 data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
-sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs;
 }
 
 if (!this.subscriptions.hasPatternSubscription()) {
-// SubscribedTopicNames - only sent if has changed since the 
last heartbeat
+// SubscribedTopicNames - only sent when joining or if has 
changed since the last heartbeat

Review Comment:
   inherited, but now that we're here: "or if **it** has changed..."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523863939


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -683,7 +681,10 @@ public void testDelayedMetadataUsedToCompleteAssignment() {
 receiveAssignment(newAssignment, membershipManager);
 membershipManager.poll(time.milliseconds());
 
-verifyReconciliationNotTriggered(membershipManager);
+// We bumped the local epoch, so new reconciliation is triggered

Review Comment:
   This new reconciliation triggered is an example of what I was referring to 
in the [comment](https://github.com/apache/kafka/pull/15511/files#r1523858796) 
above, that seems to complicate the flow with 2 rebalances instead of 1. I 
wouldn't expect a member rebalancing/reconciling at this point (no assignment 
change for him yet, t2 not in metadata), and then reconciling/rebalance again 
once it discovers it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523880045


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -508,9 +508,30 @@ private void 
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
  */
 private void replaceTargetAssignmentWithNewAssignment(
 ConsumerGroupHeartbeatResponseData.Assignment assignment) {
-currentTargetAssignment.clear();
+
+// Return if the same as current assignment; comparison without 
creating a new collection
+if (currentTargetAssignment != null) {
+// check if the new assignment is different from the current 
target assignment
+if (currentTargetAssignment.partitions.size() == 
assignment.topicPartitions().size() &&
+assignment.topicPartitions().stream().allMatch(
+tp -> 
currentTargetAssignment.partitions.containsKey(tp.topicId()) &&
+
currentTargetAssignment.partitions.get(tp.topicId()).size() == 
tp.partitions().size() &&
+
currentTargetAssignment.partitions.get(tp.topicId()).containsAll(tp.partitions(
 {
+return;
+}
+}
+
+// Bump local epoch and replace assignment
+long nextLocalEpoch;
+if (currentTargetAssignment == null) {
+nextLocalEpoch = 0;
+} else {
+nextLocalEpoch = currentTargetAssignment.localEpoch + 1;
+}

Review Comment:
   I was wrong thinking that we could reuse the server epoch here, but it's not 
truly representing new assignments for the member, and we could end up 
receiving multiple new assignments in the same server epoch. So ok with this 
local epoch approach, best way to keep track of "versions" of the assignment 
received. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523863939


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -683,7 +681,10 @@ public void testDelayedMetadataUsedToCompleteAssignment() {
 receiveAssignment(newAssignment, membershipManager);
 membershipManager.poll(time.milliseconds());
 
-verifyReconciliationNotTriggered(membershipManager);
+// We bumped the local epoch, so new reconciliation is triggered

Review Comment:
   This new reconciliation triggered is an example of what I was referring to 
in the [comment](https://github.com/apache/kafka/pull/15511/files#r1523858796) 
above, that seems to complicate the flow with 2 rebalances instead of 1. I 
wouldn't expect a member rebalancing/reconciling at this point (nothing changed 
for him yet, t2 not in metadata), and then reconciling/rebalance again once it 
discovers it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523858796


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -889,43 +914,36 @@ private void transitionToStale() {
  */
 void maybeReconcile() {
 if (targetAssignmentReconciled()) {
-log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
 "current assignment.");
 return;
 }
 if (reconciliationInProgress) {
-log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
 return;
 }
 
 // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
 // if some topic IDs are not resolvable.
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
 
-SortedSet ownedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
-ownedPartitions.addAll(subscriptions.assignedPartitions());
-
-// Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
-// being reconciled. Needed for interactions with the centralized 
subscription state that
-// does not support topic IDs yet, and for the callbacks.
-SortedSet assignedTopicPartitions = 
toTopicPartitionSet(assignedTopicIdPartitions);
-
-// Check same assignment. Based on topic names for now, until topic 
IDs are properly
-// supported in the centralized subscription state object. Note that 
this check is
-// required to make sure that reconciliation is not triggered if the 
assignment ready to
-// be reconciled is the same as the current one (even though the 
member may remain
-// in RECONCILING state if it has some unresolved assignments).
-boolean sameAssignmentReceived = 
assignedTopicPartitions.equals(ownedPartitions);
-
-if (sameAssignmentReceived) {
+if (resolvedAssignment.equals(currentAssignment)) {

Review Comment:
   well I was expecting we only need to trigger the callbacks if the assignment 
changed (could be to empty, but something needs to change), and that's not the 
case if the member ends up with t1-1 again, that it already owns. 
   
   By running a full reconciliation when the the resolved assignment is the 
same as the current but received later, we end up with a client reconciling the 
exact same assignment it already owns :S It would turn out noisy I expect, 
accounting for 2 rebalances in cases probably much more common, where a new 
topic assigned is temporarily not in metadata and then discovered: member owns 
[t1-1], receives assignment [t1-1, t2-1] with missing metadata for t2 (t2 
discovered shortly after). We would generate 2 rebalances (a 1st one with no 
changes in assignment, a 2nd one with the added topic once discovered) when 
truly things only changed once). Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Change link to deprecated class to substitute class [kafka]

2024-03-13 Thread via GitHub


cadonna opened a new pull request, #15531:
URL: https://github.com/apache/kafka/pull/15531

   The docs for Interactive Queries contain a link to deprecated class 
StreamsMetadata in package org.apache.kafka.streams.state [1].
   
   This commit changed that link to class StreamsMetadata in package 
org.apache.kafka.streams [2] that replaces the deprecated class.
   
   [1] 
https://kafka.apache.org/37/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html
   [2] 
https://kafka.apache.org/37/javadoc/org/apache/kafka/streams/StreamsMetadata.html
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]

2024-03-13 Thread via GitHub


OmniaGM commented on code in PR #15506:
URL: https://github.com/apache/kafka/pull/15506#discussion_r1523840166


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##
@@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() {
 
 workerTask.initialize(TASK_CONFIG);
 workerTask.initializeAndStart();
-try {
-workerTask.execute();
-fail("workerTask.execute should have thrown an exception");
-} catch (ConnectException e) {
-assertSame("Exception from put should be the cause", putException, 
e.getCause());
-assertTrue("Exception from close should be suppressed", 
e.getSuppressed().length > 0);
-assertSame(closeException, e.getSuppressed()[0]);
-}
+
+Throwable thrownException = assertThrows(ConnectException.class, () -> 
workerTask.execute());
+assertSame("Exception from put should be the cause", putException, 
thrownException.getCause());

Review Comment:
   LGTM 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription

2024-03-13 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826844#comment-17826844
 ] 

Phuc Hong Tran commented on KAFKA-15538:


[~lianetm] Sorry for the delay, I’ll try to get it done within this week.

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, newbie, regex
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-13 Thread via GitHub


mjsax commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1523802443


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   Good point about async window.
   
   Right now, the `KStreamKStreamJoinProcessor` does not know if it's a 
left-outer or full-outer join. However, we know inside `KStreamImplJoin` which 
is the only place in which we create `KStreamKStreamJoin`, so it should be easy 
to pass in this information into the Processor :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Kafka Streams docs fixes [kafka]

2024-03-13 Thread via GitHub


mjsax commented on code in PR #15517:
URL: https://github.com/apache/kafka/pull/15517#discussion_r1523775722


##
docs/streams/developer-guide/config-streams.html:
##
@@ -97,6 +97,7 @@
 
   Naming
   Default Values
+  Default Values

Review Comment:
   Thanks. Great catch. C&p error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-03-13 Thread via GitHub


gharris1727 commented on PR #15349:
URL: https://github.com/apache/kafka/pull/15349#issuecomment-1995368843

   @ijuma @mumrah @dajac @C0urante I'm still interested in having this change 
implemented, independent of hooking it into the CI build. PTAL, Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use

2024-03-13 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826827#comment-17826827
 ] 

Edoardo Comar commented on KAFKA-16369:
---

KafkaServer (ZKMode) needs to wait on the future returned by 
SocketServer.enableRequestProcessing

similarly to the BrokerServer (KRaft mode).

PR in progress

> Broker may not shut down when SocketServer fails to bind as Address already 
> in use
> --
>
> Key: KAFKA-16369
> URL: https://issues.apache.org/jira/browse/KAFKA-16369
> Project: Kafka
>  Issue Type: Bug
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Major
> Attachments: kraft-server.log, server.log
>
>
> When in Zookeeper mode, if a port the broker should listen to is already bound
> the KafkaException: Socket server failed to bind to localhost:9092: Address 
> already in use.
> is thrown but the Broker continues to startup .
> It correctly shuts down when in KRaft mode.
> Easy to reproduce when in Zookeper mode with server.config set to listen to 
> localhost only
> {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Resolve SslContextFactory method deprecations [kafka]

2024-03-13 Thread via GitHub


gharris1727 merged PR #15468:
URL: https://github.com/apache/kafka/pull/15468


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use [kafka]

2024-03-13 Thread via GitHub


edoardocomar opened a new pull request, #15530:
URL: https://github.com/apache/kafka/pull/15530

   Add a Wait for all the SocketServer ports to be open, and the Acceptors to 
be started
   
   The BrokerServer (KRaft mode) had such a wait, which was missing from the 
KafkaServer (ZK mode).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Resolve SslContextFactory method deprecations [kafka]

2024-03-13 Thread via GitHub


gharris1727 commented on PR #15468:
URL: https://github.com/apache/kafka/pull/15468#issuecomment-1995349571

   Test failures appear unrelated, and the connect tests pass for me locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523734711


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1497,4 +1502,93 @@ public PollResult poll(final long currentTimeMs) {
 List stateListeners() {
 return unmodifiableList(stateUpdatesListeners);
 }
+
+private final static class LocalAssignmentImpl implements LocalAssignment {
+
+private static final long NONE_EPOCH = -1;
+
+private static final LocalAssignmentImpl NONE = new 
LocalAssignmentImpl(NONE_EPOCH, Collections.emptyMap());
+
+private final long localEpoch;
+
+private final Map> partitions;
+
+public LocalAssignmentImpl(long localEpoch, Map> partitions) {
+this.localEpoch = localEpoch;
+this.partitions = partitions;
+if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) {
+throw new IllegalArgumentException("Local epoch must be set if 
there are partitions");
+}
+}
+
+public LocalAssignmentImpl(long localEpoch, 
SortedSet topicIdPartitions) {
+this.localEpoch = localEpoch;
+this.partitions = new HashMap<>();
+if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) {
+throw new IllegalArgumentException("Local epoch must be set if 
there are partitions");
+}
+topicIdPartitions.forEach(topicIdPartition -> {
+Uuid topicId = topicIdPartition.topicId();
+partitions.computeIfAbsent(topicId, k -> new 
TreeSet<>()).add(topicIdPartition.partition());
+});
+}
+
+@Override
+public String toString() {
+return "{" +
+"localEpoch=" + localEpoch +
+", partitions=" + partitions +
+'}';
+}
+
+@Override
+public boolean equals(final Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+final LocalAssignmentImpl that = (LocalAssignmentImpl) o;
+return localEpoch == that.localEpoch && Objects.equals(partitions, 
that.partitions);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(localEpoch, partitions);
+}
+
+@Override
+public Map> getPartitions() {
+return partitions;
+}
+
+@Override
+public boolean isNone() {
+return localEpoch == NONE_EPOCH;
+}
+
+Optional 
updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+
+// Return if we have an assignment, and it is the same as current 
assignment; comparison without creating a new collection
+if (localEpoch != NONE_EPOCH) {
+// check if the new assignment is different from the current 
target assignment

Review Comment:
   nit: here we are checking if the new assignment is "the same as" the current 
(not diff)even maybe just remove this comment, as the one above seems 
enough? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1523732734


##
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##
@@ -109,23 +109,30 @@ object TransactionMarkerChannelManager {
 
 }
 
-class TxnMarkerQueue(@volatile var destination: Node) {
+class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
 
   // keep track of the requests per txn topic partition so we can easily clear 
the queue
   // during partition emigration
-  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[TxnIdAndMarkerEntry]]().asScala
+  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala
 
-  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[TxnIdAndMarkerEntry]] = {
+  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = {
 markersPerTxnTopicPartition.remove(partition)
   }
 
-  def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): 
Unit = {
-val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition,
-new LinkedBlockingQueue[TxnIdAndMarkerEntry]())
-queue.add(txnIdAndMarker)
+  def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: 
PendingCompleteTxnAndMarkerEntry): Unit = {
+val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition, {
+  info(s"Creating new marker queue for txn partition $txnTopicPartition to 
destination broker ${destination.id}")
+  new LinkedBlockingQueue[PendingCompleteTxnAndMarkerEntry]()
+})
+queue.add(pendingCompleteTxnAndMarker)
+
+if (markersPerTxnTopicPartition.get(txnTopicPartition).orNull != queue) {
+  // This could happen if the queue got removed concurrently.

Review Comment:
   Should we do something if we added to a "dead queue"? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1523717757


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1504,6 +1507,8 @@ public void handleResponse(AbstractResponse response) {
 fatalError(error.exception());
 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
 abortableError(GroupAuthorizationException.forGroupId(key));
+} else if (error == Errors.ABORTABLE_TRANSACTION_EXCEPTION) {

Review Comment:
   The idea of this error is that we are future proofing. In the future if we 
want the producer to abort the transaction (say we have a new use case that 
requires this behavior) we can rely on this error to do the correct thing for 
older clients. (Or clients 3.8 and later in this case :) )
   
   We ran into this issue a lot when picking error codes for kip-890 part 1. If 
we had such an error then, we could have used it for the old clients. Instead, 
we chosen INVALID_TXN_STATE which has inconsistent behavior across clients.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1523717757


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1504,6 +1507,8 @@ public void handleResponse(AbstractResponse response) {
 fatalError(error.exception());
 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
 abortableError(GroupAuthorizationException.forGroupId(key));
+} else if (error == Errors.ABORTABLE_TRANSACTION_EXCEPTION) {

Review Comment:
   The idea of this error is that we are future proofing. In the future if we 
want the producer to abort the transaction (say we have a new use case that 
requires this behavior) we can rely on this error to do the correct thing for 
older clients. 
   
   We ran into this issue a lot when picking error codes for kip-890 part 1. If 
we had such an error then, we could have used it for the old clients. Instead, 
we chosen INVALID_TXN_STATE which has inconsistent behavior across clients.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-13 Thread via GitHub


CalvinConfluent commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1523705762


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1504,6 +1507,8 @@ public void handleResponse(AbstractResponse response) {
 fatalError(error.exception());
 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
 abortableError(GroupAuthorizationException.forGroupId(key));
+} else if (error == Errors.ABORTABLE_TRANSACTION_EXCEPTION) {

Review Comment:
   Does FindCoordinatorRequest also return the ABORTABLE_TRANSACTION_EXCEPTION?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523709384


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -530,19 +530,12 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
-membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-data.setInstanceId(groupInstanceId);
-sentFields.instanceId = groupInstanceId;
-}
-});
+// InstanceId - always send when leaving the group as a static 
member
+membershipManager.groupInstanceId().ifPresent(data::setInstanceId);

Review Comment:
   yes, that's the current expectation, so looks good. Let's just update the 
comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-13 Thread via GitHub


artemlivshits commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1523702932


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+TopicDescription nextTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {

Review Comment:
   Ideally we should drive the pagination from the cursor, rather than 
spreading the logic between multiple states.  We plan to get rid of topic list 
eventually and this logic would be easy to miss.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toM

[jira] [Assigned] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use

2024-03-13 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar reassigned KAFKA-16369:
-

Assignee: Edoardo Comar

> Broker may not shut down when SocketServer fails to bind as Address already 
> in use
> --
>
> Key: KAFKA-16369
> URL: https://issues.apache.org/jira/browse/KAFKA-16369
> Project: Kafka
>  Issue Type: Bug
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Major
> Attachments: kraft-server.log, server.log
>
>
> When in Zookeeper mode, if a port the broker should listen to is already bound
> the KafkaException: Socket server failed to bind to localhost:9092: Address 
> already in use.
> is thrown but the Broker continues to startup .
> It correctly shuts down when in KRaft mode.
> Easy to reproduce when in Zookeper mode with server.config set to listen to 
> localhost only
> {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use

2024-03-13 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-16369:
--
Attachment: server.log
kraft-server.log

> Broker may not shut down when SocketServer fails to bind as Address already 
> in use
> --
>
> Key: KAFKA-16369
> URL: https://issues.apache.org/jira/browse/KAFKA-16369
> Project: Kafka
>  Issue Type: Bug
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: kraft-server.log, server.log
>
>
> When in Zookeeper mode, if a port the broker should listen to is already bound
> the KafkaException: Socket server failed to bind to localhost:9092: Address 
> already in use.
> is thrown but the Broker continues to startup .
> It correctly shuts down when in KRaft mode.
> Easy to reproduce when in Zookeper mode with server.config set to listen to 
> localhost only
> {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use

2024-03-13 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826811#comment-17826811
 ] 

Edoardo Comar commented on KAFKA-16369:
---

server logs as broker only (zk mode) and broker/controller (kraft mode) when 
port 9092 is already bound

[^server.log][^kraft-server.log]

> Broker may not shut down when SocketServer fails to bind as Address already 
> in use
> --
>
> Key: KAFKA-16369
> URL: https://issues.apache.org/jira/browse/KAFKA-16369
> Project: Kafka
>  Issue Type: Bug
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: kraft-server.log, server.log
>
>
> When in Zookeeper mode, if a port the broker should listen to is already bound
> the KafkaException: Socket server failed to bind to localhost:9092: Address 
> already in use.
> is thrown but the Broker continues to startup .
> It correctly shuts down when in KRaft mode.
> Easy to reproduce when in Zookeper mode with server.config set to listen to 
> localhost only
> {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use

2024-03-13 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826811#comment-17826811
 ] 

Edoardo Comar edited comment on KAFKA-16369 at 3/13/24 5:50 PM:


server logs as broker only (zk mode) and broker/controller (kraft mode) when 
port 9092 is already bound

[^server.log]

[^kraft-server.log]


was (Author: ecomar):
server logs as broker only (zk mode) and broker/controller (kraft mode) when 
port 9092 is already bound

[^server.log][^kraft-server.log]

> Broker may not shut down when SocketServer fails to bind as Address already 
> in use
> --
>
> Key: KAFKA-16369
> URL: https://issues.apache.org/jira/browse/KAFKA-16369
> Project: Kafka
>  Issue Type: Bug
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: kraft-server.log, server.log
>
>
> When in Zookeeper mode, if a port the broker should listen to is already bound
> the KafkaException: Socket server failed to bind to localhost:9092: Address 
> already in use.
> is thrown but the Broker continues to startup .
> It correctly shuts down when in KRaft mode.
> Easy to reproduce when in Zookeper mode with server.config set to listen to 
> localhost only
> {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use

2024-03-13 Thread Edoardo Comar (Jira)
Edoardo Comar created KAFKA-16369:
-

 Summary: Broker may not shut down when SocketServer fails to bind 
as Address already in use
 Key: KAFKA-16369
 URL: https://issues.apache.org/jira/browse/KAFKA-16369
 Project: Kafka
  Issue Type: Bug
Reporter: Edoardo Comar


When in Zookeeper mode, if a port the broker should listen to is already bound

the KafkaException: Socket server failed to bind to localhost:9092: Address 
already in use.

is thrown but the Broker continues to startup .

It correctly shuts down when in KRaft mode.

Easy to reproduce when in Zookeper mode with server.config set to listen to 
localhost only
{color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color}
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-13 Thread via GitHub


junrao commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1523677272


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -293,11 +293,11 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
-offsetOfMaxTimestamp = initialOffset;
-}
-
-if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
-offsetOfMaxTimestamp = offsetCounter.value - 1;
+if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
+offsetOfMaxTimestamp = offsetCounter.value - 1;

Review Comment:
   > It seems KIP-734 does not define the case we are discussing - return the 
initial/last offset when there are multi-records having same "max timestamp".
   
   Yes, I agree that we need to be consistent. Using the initial offset makes 
the most intuitive sense to me. We could update KIP-734 with this clarification 
and also send it to the original vote thread to make sure there is no objection.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]

2024-03-13 Thread via GitHub


chia7712 commented on code in PR #15506:
URL: https://github.com/apache/kafka/pull/15506#discussion_r1523669497


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##
@@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() {
 
 workerTask.initialize(TASK_CONFIG);
 workerTask.initializeAndStart();
-try {
-workerTask.execute();
-fail("workerTask.execute should have thrown an exception");
-} catch (ConnectException e) {
-assertSame("Exception from put should be the cause", putException, 
e.getCause());
-assertTrue("Exception from close should be suppressed", 
e.getSuppressed().length > 0);
-assertSame(closeException, e.getSuppressed()[0]);
-}
+
+Throwable thrownException = assertThrows(ConnectException.class, () -> 
workerTask.execute());
+assertSame("Exception from put should be the cause", putException, 
thrownException.getCause());

Review Comment:
   @OmniaGM @gharris1727 I'd like to ship it tomorrow if both of you have no 
objection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


chia7712 commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1523656781


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {
-return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity));
-}
-
-public BoundedList(int maxLength, List underlying) {
+public BoundedList(int maxLength) {
 if (maxLength <= 0) {
 throw new IllegalArgumentException("Invalid non-positive maxLength 
of " + maxLength);
 }
 this.maxLength = maxLength;
-if (underlying.size() > maxLength) {
-throw new BoundedListTooLongException("Cannot wrap list, because 
it is longer than " +
-"the maximum length " + maxLength);
-}
-this.underlying = underlying;
+this.underlying = new ArrayList<>(maxLength);

Review Comment:
   It seems to me the large batch operation is not usual. It could be a 
potential performance regression if we always create `ArrayList` with `1` 
initial size.
   
   If the origin constructor has potential issue about using a reference to 
`underlying`, we can change it from `public` to `private`.
   
   



##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {

Review Comment:
   The operation which we can ensure the size of batch before processing can 
use this method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Minor: downgrade the minISR warning to debug [kafka]

2024-03-13 Thread via GitHub


jsancio commented on code in PR #15529:
URL: https://github.com/apache/kafka/pull/15529#discussion_r1523660494


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2189,7 +2189,7 @@ int getTopicEffectiveMinIsr(String topicName) {
 if (minIsrConfig != null) {
 currentMinIsr = Integer.parseInt(minIsrConfig);
 } else {
-log.warn("Can't find the min isr config for topic: " + topicName + 
" using default value " + defaultMinIsr);
+log.debug("Can't find the min isr config for topic: " + topicName 
+ " using default value " + defaultMinIsr);

Review Comment:
   This comment is for the `log.warn` message in line 2200.
   
   What exception are you trying to catch exactly? Is it a 
`NullPointerException`? If so, shouldn't we throw the exception up the stack? 
Based on the code I would assume that if there is an error it means that the 
controller is trying to find the replication factor for a topic and doesn't 
exists, no?
   
   If you decide to keep the log message, lets log the exception instead of 
logging the exception's `toString`.
   ```java
   log.warn("Can't find the replication factor for topic: " + topicName + " 
using default value " + replicationFactor, e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-13 Thread via GitHub


junrao commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1523649729


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,236 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
+  val mockTime: Time = new MockTime(1)
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-produceMessages()
+createTopicWithConfig(topicName, new Properties())
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
+  override def brokerTime(brokerId: Int): Time = mockTime
+
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch("gzip")
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testLatestOffset(quorum: String): Unit = {
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
-assertEquals(3, latestOffset.offset())
+  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+produceMessagesInSeparateBatch()
+verifyListOffsets()

Review Comment:
   Why did we exclude the LogAppendTime test in this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226; Reduce synchronization between producer threads (#15323) [kafka]

2024-03-13 Thread via GitHub


msn-tldr commented on PR #15498:
URL: https://github.com/apache/kafka/pull/15498#issuecomment-1995048622

   The tests failing on jenkins are unrelated, flaky tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-13 Thread via GitHub


mumrah commented on code in PR #15478:
URL: https://github.com/apache/kafka/pull/15478#discussion_r1523630806


##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -404,21 +414,33 @@ final class KafkaMetadataLog private (
* all cases.
*
* For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
+   * The predicate returns a Some with the reason if the snapshot should be 
deleted and a None if the snapshot
+   * should not be deleted
*/
-  private def cleanSnapshots(predicate: OffsetAndEpoch => Boolean): Boolean = {
-if (snapshots.size < 2)
+  private def cleanSnapshots(predicate: OffsetAndEpoch => 
Option[SnapshotDeletionReason]): Boolean = {
+if (snapshots.size < 2) {
   return false
+}
 
 var didClean = false
 snapshots.keys.toSeq.sliding(2).foreach {
   case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
-if (predicate(snapshot) && deleteBeforeSnapshot(nextSnapshot)) {
-  didClean = true
-} else {
-  return didClean
+predicate(snapshot) match {
+  case Some(reason) =>
+if (deleteBeforeSnapshot(nextSnapshot, reason)) {

Review Comment:
   I assume `&&` will short-circuit in Scala like it does in Java (in which 
case this change looks correct)



##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -677,4 +675,38 @@ object KafkaMetadataLog extends Logging {
   Snapshots.deleteIfExists(logDir, snapshotId)
 }
   }
+
+  private sealed trait SnapshotDeletionReason {
+def reason(snapshotId: OffsetAndEpoch): String
+  }
+
+  private final case class RetentionMsBreach(now: Long, timestamp: Long, 
retentionMillis: Long) extends SnapshotDeletionReason {
+override def reason(snapshotId: OffsetAndEpoch): String = {
+  s"""Marking snapshot $snapshotId for deletion because it timestamp 
($timestamp) is now ($now) older than the
+  |retention ($retentionMillis""".stripMargin

Review Comment:
   is there a missing ")" here, or should we not have the opening "("?



##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -348,14 +357,15 @@ final class KafkaMetadataLog private (
   snapshotId.offset <= latestSnapshotId.offset &&
   log.maybeIncrementLogStartOffset(snapshotId.offset, 
LogStartOffsetIncrementReason.SnapshotGenerated) =>
 // Delete all segments that have a "last offset" less than the log 
start offset
-log.deleteOldSegments()
+val deletedSegments = log.deleteOldSegments()
 // Remove older snapshots from the snapshots cache
-(true, forgetSnapshotsBefore(snapshotId))
+val forgottenSnapshots = forgetSnapshotsBefore(snapshotId)
+(deletedSegments != 0 || forgottenSnapshots.nonEmpty, 
forgottenSnapshots)

Review Comment:
   Interesting, did we have a bug here previously? Looks like before we would 
always report `deleted=true` even if nothing was deleted. I wonder if we even 
use this return value 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1523608929


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -42,14 +42,26 @@ object AddPartitionsToTxnManager {
   val VerificationTimeMsMetricName = "VerificationTimeMs"
 }
 
+/**
+ * This is an enum which handles the Partition Response based on the Produce 
Request Version and the exact operation
+ *defaultOperation:   This is the default workflow which maps to cases 
when the Produce Request Version was lower than expected or when exercising the 
offset commit request path

Review Comment:
   It is a similar process to pass in the the value, but we will need it for 
both group coordinator paths. Let me know if you need help.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1523603267


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -42,14 +42,26 @@ object AddPartitionsToTxnManager {
   val VerificationTimeMsMetricName = "VerificationTimeMs"
 }
 
+/**
+ * This is an enum which handles the Partition Response based on the Produce 
Request Version and the exact operation
+ *defaultOperation:   This is the default workflow which maps to cases 
when the Produce Request Version was lower than expected or when exercising the 
offset commit request path

Review Comment:
   I think we should include this change in the offset commit path. It will 
require bumping that request version as well. 
   
   > We will bump the ProduceRequest/Response and 
TxnOffsetCommitRequest/Response version to indicate the client is using the new 
protocol that doesn’t require adding partitions to transactions and will 
implicitly do so. The bump will also support new errors ABORTABLE_ERROR
   
   We are just doing 2 bumps for the above comment 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1523597267


##
clients/src/main/java/org/apache/kafka/common/errors/AbortableTransactionException.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class AbortableTransactionException extends ApiException {

Review Comment:
   I'm still thinking on this name as well. Since all transactions are 
technically abortable, and what we want to convey is that the only way forward 
is to abort, would it make sense to say TransactionAbortableException?
   
   Maybe it doesn't make a huge difference though. 😅 Wondering if there is a 
way to make that distinction of abortable vs we need to abort to proceed. I 
realize though that we are also modeling off of RetriableException here and 
trying to keep that similar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Minor: downgrade the minISR warning to debug [kafka]

2024-03-13 Thread via GitHub


CalvinConfluent opened a new pull request, #15529:
URL: https://github.com/apache/kafka/pull/15529

   Downgrade the log level to avoid spamming the log


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16226) Java client: Performance regression in Trogdor benchmark with high partition counts

2024-03-13 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16226:
--
Fix Version/s: 3.6.2
   (was: 3.6.3)

> Java client: Performance regression in Trogdor benchmark with high partition 
> counts
> ---
>
> Key: KAFKA-16226
> URL: https://issues.apache.org/jira/browse/KAFKA-16226
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Mayank Shekhar Narula
>Assignee: Mayank Shekhar Narula
>Priority: Major
>  Labels: kip-951
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
> Attachments: baseline_lock_profile.png, kafka_15415_lock_profile.png
>
>
> h1. Background
> https://issues.apache.org/jira/browse/KAFKA-15415 implemented optimisation in 
> java-client to skip backoff period if client knows of a newer leader, for 
> produce-batch being retried.
> h1. What changed
> The implementation introduced a regression noticed on a trogdor-benchmark 
> running with high partition counts(36000!).
> With regression, following metrics changed on the produce side.
>  # record-queue-time-avg: increased from 20ms to 30ms.
>  # request-latency-avg: increased from 50ms to 100ms.
> h1. Why it happened
> As can be seen from the original 
> [PR|https://github.com/apache/kafka/pull/14384] 
> RecordAccmulator.partitionReady() & drainBatchesForOneNode() started using 
> synchronised method Metadata.currentLeader(). This has led to increased 
> synchronization between KafkaProducer's application-thread that call send(), 
> and background-thread that actively send producer-batches to leaders.
> Lock profiles clearly show increased synchronisation in KAFKA-15415 
> PR(highlighted in {color:#de350b}Red{color}) Vs baseline ( see below ). Note 
> the synchronisation is much worse for paritionReady() in this benchmark as 
> its called for each partition, and it has 36k partitions!
> h3. Lock Profile: Kafka-15415
> !kafka_15415_lock_profile.png!
> h3. Lock Profile: Baseline
> !baseline_lock_profile.png!
> h1. Fix
> Synchronization has to be reduced between 2 threads in order to address this. 
> [https://github.com/apache/kafka/pull/15323] is a fix for it, as it avoids 
> using Metadata.currentLeader() instead rely on Cluster.leaderFor().
> With the fix, lock-profile & metrics are similar to baseline.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-13 Thread via GitHub


ChrisAHolland commented on PR #15507:
URL: https://github.com/apache/kafka/pull/15507#issuecomment-1994913616

   @showuon @jolshan Would appreciate if you could take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1523586414


##
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java:
##
@@ -392,7 +393,8 @@ public enum Errors {
 UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new),
 UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an 
invalid or outdated subscription ID.", UnknownSubscriptionIdException::new),
 TELEMETRY_TOO_LARGE(118, "Client sent a push telemetry request larger than 
the maximum size the broker will accept.", TelemetryTooLargeException::new),
-INVALID_REGISTRATION(119, "The controller has considered the broker 
registration to be invalid.", InvalidRegistrationException::new);
+INVALID_REGISTRATION(119, "The controller has considered the broker 
registration to be invalid.", InvalidRegistrationException::new),
+ABORTABLE_TRANSACTION_EXCEPTION(120, "This maps to all errors which can be 
fixed by aborting the current transaction.", 
AbortableTransactionException::new);

Review Comment:
   nit: let's leave off the "exception" part to be consistent with the other 
errors here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Only enable replay methods to modify timeline data structure [kafka]

2024-03-13 Thread via GitHub


dongnuo123 commented on code in PR #15528:
URL: https://github.com/apache/kafka/pull/15528#discussion_r1523584782


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -593,7 +595,8 @@ public List 
describeGroups(
  */
 ConsumerGroup getOrMaybeCreateConsumerGroup(
 String groupId,
-boolean createIfNotExists
+boolean createIfNotExists,
+boolean addToGroupsMap

Review Comment:
   Yeah, you're right. I added the flag to reduce redundancy but it does bring 
risk of misusing... Let me change this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Only enable replay methods to modify timeline data structure [kafka]

2024-03-13 Thread via GitHub


dajac commented on code in PR #15528:
URL: https://github.com/apache/kafka/pull/15528#discussion_r1523579678


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -593,7 +595,8 @@ public List 
describeGroups(
  */
 ConsumerGroup getOrMaybeCreateConsumerGroup(
 String groupId,
-boolean createIfNotExists
+boolean createIfNotExists,
+boolean addToGroupsMap

Review Comment:
   I am not a fan of this new boolean flag because there is a change of 
misusing it. Have you considered keeping `getOrMaybeCreateConsumerGroup` on the 
`consumerGroupHeartbeat` path (without the part which adds to the map) and 
change the replay logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-13 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1523572950


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
 response.setAssignment(createResponseAssignment(updatedMember));
 }
 
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId   The group id.
+ * @param memberThe member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ *  a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment  The target assignment.
+ * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+ *  is reported in the ConsumerGroupHeartbeat 
API and
+ *  it could be null if not provided.
+ * @param records   The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+private ConsumerGroupMember maybeReconcile(
+String groupId,
+ConsumerGroupMember member,
+BiFunction currentPartitionEpoch,
+int targetAssignmentEpoch,
+Assignment targetAssignment,
+List 
ownedTopicPartitions,
+List records
+) {
+if (member.isReconciledTo(targetAssignmentEpoch)) {
+return member;
+}
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+.withCurrentPartitionEpoch(currentPartitionEpoch)
+.withOwnedTopicPartitions(ownedTopicPartitions)
+.build();
+
+if (!updatedMember.equals(member)) {
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+scheduleConsumerGroupRebalanceTimeout(
+groupId,
+updatedMember.memberId(),
+updatedMember.memberEpoch(),
+updatedMember.rebalanceTimeoutMs()
+);
+} else {

Review Comment:
   Yeah, this could potentially happen. That's the tradeoff that we have to 
make here. However, I would like to point out that on the client, the partition 
is considered assigned even if the client may not be able to fetch from it 
(e.g. in case of stale metadata) so the contract is loose anyway here.
   
   If it turns out to be an issue in the future, I think that we could have a 
timer for individual partitions. I refrained myself from doing this because it 
brings a lot of complexity and it does not seem worth it at the moment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]

2024-03-13 Thread via GitHub


OmniaGM commented on code in PR #15506:
URL: https://github.com/apache/kafka/pull/15506#discussion_r1523571839


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##
@@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() {
 
 workerTask.initialize(TASK_CONFIG);
 workerTask.initializeAndStart();
-try {
-workerTask.execute();
-fail("workerTask.execute should have thrown an exception");
-} catch (ConnectException e) {
-assertSame("Exception from put should be the cause", putException, 
e.getCause());
-assertTrue("Exception from close should be suppressed", 
e.getSuppressed().length > 0);
-assertSame(closeException, e.getSuppressed()[0]);
-}
+
+Throwable thrownException = assertThrows(ConnectException.class, () -> 
workerTask.execute());
+assertSame("Exception from put should be the cause", putException, 
thrownException.getCause());

Review Comment:
I can't see why we care about the ref in some of these tests but if this is 
the intent then keeping `assertSame` is okay. My comment was more a question 
whether we need the test to be explicit or not. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-13 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1523564661


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
 response.setAssignment(createResponseAssignment(updatedMember));
 }
 
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId   The group id.
+ * @param memberThe member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ *  a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment  The target assignment.
+ * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+ *  is reported in the ConsumerGroupHeartbeat 
API and
+ *  it could be null if not provided.
+ * @param records   The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+private ConsumerGroupMember maybeReconcile(
+String groupId,
+ConsumerGroupMember member,
+BiFunction currentPartitionEpoch,
+int targetAssignmentEpoch,
+Assignment targetAssignment,
+List 
ownedTopicPartitions,
+List records
+) {
+if (member.isReconciledTo(targetAssignmentEpoch)) {
+return member;
+}
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+.withCurrentPartitionEpoch(currentPartitionEpoch)
+.withOwnedTopicPartitions(ownedTopicPartitions)
+.build();
+
+if (!updatedMember.equals(member)) {
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+scheduleConsumerGroupRebalanceTimeout(
+groupId,
+updatedMember.memberId(),
+updatedMember.memberEpoch(),
+updatedMember.rebalanceTimeoutMs()
+);
+} else {

Review Comment:
   Ah. I was actually just investigating an incident where we had an issue with 
a deleted partition with the old protocol. 😅 
   
   So this makes sense. I guess my only remaining question is whether there is 
a case where we could get stuck not consuming an existing partition that is 
assigned to a live member, but that member is not able to get the assignment 
but is able to heartbeat. This is the case where an assignment timeout could 
also free this assigned partition to be assigned to another member that may be 
able to take it. 
   
   I agree though that it is better in the current state (not blocking the 
members from getting to the reconcillation etc). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]

2024-03-13 Thread via GitHub


gharris1727 commented on code in PR #15506:
URL: https://github.com/apache/kafka/pull/15506#discussion_r1523535566


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##
@@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() {
 
 workerTask.initialize(TASK_CONFIG);
 workerTask.initializeAndStart();
-try {
-workerTask.execute();
-fail("workerTask.execute should have thrown an exception");
-} catch (ConnectException e) {
-assertSame("Exception from put should be the cause", putException, 
e.getCause());
-assertTrue("Exception from close should be suppressed", 
e.getSuppressed().length > 0);
-assertSame(closeException, e.getSuppressed()[0]);
-}
+
+Throwable thrownException = assertThrows(ConnectException.class, () -> 
workerTask.execute());
+assertSame("Exception from put should be the cause", putException, 
thrownException.getCause());

Review Comment:
   When I wrote this test originally, I did intentionally use assertSame to do 
a reference equality check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close

2024-03-13 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16217:
--
Fix Version/s: 3.8.0
   3.6.3
   (was: 3.6.2)

> Transactional producer stuck in IllegalStateException during close
> --
>
> Key: KAFKA-16217
> URL: https://issues.apache.org/jira/browse/KAFKA-16217
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Calvin Liu
>Assignee: Kirk True
>Priority: Major
>  Labels: transactions
> Fix For: 3.8.0, 3.7.1, 3.6.3
>
>
> The producer is stuck during the close. It keeps retrying to abort the 
> transaction but it never succeeds. 
> {code:java}
> [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | 
> producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] 
> org.apache.kafka.clients.producer.internals.Sender run - [Producer 
> clientId=producer-transaction-ben
> ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, 
> transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] 
> Error in kafka producer I/O thread while aborting transaction:
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) 
> {code}
> With the additional log, I found the root cause. If the producer is in a bad 
> transaction state(in my case, the TransactionManager.pendingTransition was 
> set to commitTransaction and did not get cleaned), then the producer calls 
> close and tries to abort the existing transaction, the producer will get 
> stuck in the transaction abortion. It is related to the fix 
> [https://github.com/apache/kafka/pull/13591].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16226) Java client: Performance regression in Trogdor benchmark with high partition counts

2024-03-13 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16226:
--
Fix Version/s: 3.6.3
   (was: 3.6.2)

> Java client: Performance regression in Trogdor benchmark with high partition 
> counts
> ---
>
> Key: KAFKA-16226
> URL: https://issues.apache.org/jira/browse/KAFKA-16226
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Mayank Shekhar Narula
>Assignee: Mayank Shekhar Narula
>Priority: Major
>  Labels: kip-951
> Fix For: 3.8.0, 3.7.1, 3.6.3
>
> Attachments: baseline_lock_profile.png, kafka_15415_lock_profile.png
>
>
> h1. Background
> https://issues.apache.org/jira/browse/KAFKA-15415 implemented optimisation in 
> java-client to skip backoff period if client knows of a newer leader, for 
> produce-batch being retried.
> h1. What changed
> The implementation introduced a regression noticed on a trogdor-benchmark 
> running with high partition counts(36000!).
> With regression, following metrics changed on the produce side.
>  # record-queue-time-avg: increased from 20ms to 30ms.
>  # request-latency-avg: increased from 50ms to 100ms.
> h1. Why it happened
> As can be seen from the original 
> [PR|https://github.com/apache/kafka/pull/14384] 
> RecordAccmulator.partitionReady() & drainBatchesForOneNode() started using 
> synchronised method Metadata.currentLeader(). This has led to increased 
> synchronization between KafkaProducer's application-thread that call send(), 
> and background-thread that actively send producer-batches to leaders.
> Lock profiles clearly show increased synchronisation in KAFKA-15415 
> PR(highlighted in {color:#de350b}Red{color}) Vs baseline ( see below ). Note 
> the synchronisation is much worse for paritionReady() in this benchmark as 
> its called for each partition, and it has 36k partitions!
> h3. Lock Profile: Kafka-15415
> !kafka_15415_lock_profile.png!
> h3. Lock Profile: Baseline
> !baseline_lock_profile.png!
> h1. Fix
> Synchronization has to be reduced between 2 threads in order to address this. 
> [https://github.com/apache/kafka/pull/15323] is a fix for it, as it avoids 
> using Metadata.currentLeader() instead rely on Cluster.leaderFor().
> With the fix, lock-profile & metrics are similar to baseline.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523523053


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 
 // ClientAssignors - not supported yet
 
-// TopicPartitions - only sent if it has changed since the last 
heartbeat. Note that
-// the string consists of just the topic ID and the partitions. 
When an assignment is
-// received, we might not yet know the topic name, and then it is 
learnt subsequently
-// by a metadata update.
-TreeSet assignedPartitions = 
membershipManager.currentAssignment().entrySet().stream()
-.map(entry -> entry.getKey() + "-" + entry.getValue())
-.collect(Collectors.toCollection(TreeSet::new));
-if (!assignedPartitions.equals(sentFields.topicPartitions)) {
+// TopicPartitions - sent with the first heartbeat after a new 
assignment from the server was
+// reconciled. This is ensured by resending the topic partitions 
whenever the local assignment,
+// including its local epoch is changed (although the local epoch 
is not sent in the heartbeat).
+LocalAssignment local = membershipManager.currentAssignment();
+if (local == null) {
+data.setTopicPartitions(Collections.emptyList());
+sentFields.topicPartitions = null;
+} else if (!local.equals(sentFields.topicPartitions)) {

Review Comment:
   Renamed the field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16120) Fix partition reassignment during ZK migration

2024-03-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16120:
-
Fix Version/s: 3.6.2

> Fix partition reassignment during ZK migration
> --
>
> Key: KAFKA-16120
> URL: https://issues.apache.org/jira/browse/KAFKA-16120
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
> Fix For: 3.7.0, 3.6.2
>
>
> When a reassignment is completed in ZK migration hybrid mode, the 
> `StopReplica` sent by the kraft quorum migration propagator is sent with 
> `delete = false` for deleted replicas when processing the topic delta. This 
> results in stray replicas.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16120) Fix partition reassignment during ZK migration

2024-03-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16120:
-
Affects Version/s: 3.6.1
   3.6.0

> Fix partition reassignment during ZK migration
> --
>
> Key: KAFKA-16120
> URL: https://issues.apache.org/jira/browse/KAFKA-16120
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0, 3.6.1
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
> Fix For: 3.7.0, 3.6.2
>
>
> When a reassignment is completed in ZK migration hybrid mode, the 
> `StopReplica` sent by the kraft quorum migration propagator is sent with 
> `delete = false` for deleted replicas when processing the topic delta. This 
> results in stray replicas.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523522542


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 }
 });
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining or if has changed 
since the last heartbeat
+if (membershipManager.memberEpoch() == 0 || 
sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {

Review Comment:
   Done



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -508,9 +508,30 @@ private void 
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
  */
 private void replaceTargetAssignmentWithNewAssignment(
 ConsumerGroupHeartbeatResponseData.Assignment assignment) {
-currentTargetAssignment.clear();
+

Review Comment:
   Implemented both changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523522030


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -210,12 +213,12 @@ public class MembershipManagerImpl implements 
MembershipManager {
 private final Map assignedTopicNamesCache;
 
 /**
- * Topic IDs and partitions received in the last target assignment. Items 
are added to this set
- * every time a target assignment is received. This is where the member 
collects the assignment
- * received from the broker, even though it may not be ready to fully 
reconcile due to missing
- * metadata.
+ * Topic IDs and partitions received in the last target assignment, 
together with its local epoch.
+ *
+ * This member reassigned every time a new assignment is received.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15689) KRaftMigrationDriver not logging the skipped event when expected state is wrong

2024-03-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15689:
-
Fix Version/s: 3.6.2

> KRaftMigrationDriver not logging the skipped event when expected state is 
> wrong
> ---
>
> Key: KAFKA-15689
> URL: https://issues.apache.org/jira/browse/KAFKA-15689
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
> Fix For: 3.7.0, 3.6.2
>
>
> The KRaftMigrationDriver.checkDriverState is used in multiple implementations 
> of the 
> MigrationEvent base class but when it comes to log that an event was skipped 
> because the expected state is wrong, it always log "KRafrMigrationDriver" 
> instead of the skipped event.
> For example, a logging line could be like this:
> {code:java}
> 2023-10-25 12:17:25,460 INFO [KRaftMigrationDriver id=5] Expected driver 
> state ZK_MIGRATION but found SYNC_KRAFT_TO_ZK. Not running this event 
> KRaftMigrationDriver. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
> [controller-5-migration-driver-event-handler] {code}
> This is because its code has something like this:
> {code:java}
> log.info("Expected driver state {} but found {}. Not running this event {}.",
> expectedState, migrationState, this.getClass().getSimpleName()); {code}
> Of course, the "this" is referring to the KRafrMigrationDriver class.
> It should print the specific skipped event instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-13 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523520659


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -530,19 +530,12 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
-membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-data.setInstanceId(groupInstanceId);
-sentFields.instanceId = groupInstanceId;
-}
-});
+// InstanceId - always send when leaving the group as a static 
member
+membershipManager.groupInstanceId().ifPresent(data::setInstanceId);

Review Comment:
   I updated instance ID to be send it all the time when present. I think the 
last information was that GC relies on this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16171) Controller failover during ZK migration can prevent metadata updates to ZK brokers

2024-03-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-16171.
--
Resolution: Fixed

> Controller failover during ZK migration can prevent metadata updates to ZK 
> brokers
> --
>
> Key: KAFKA-16171
> URL: https://issues.apache.org/jira/browse/KAFKA-16171
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft, migration
>Affects Versions: 3.6.0, 3.7.0, 3.6.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.6.2, 3.7.0
>
>
> h2. Description
> During the ZK migration, after KRaft becomes the active controller we enter a 
> state called hybrid mode. This means we have a mixture of ZK and KRaft 
> brokers. The KRaft controller updates the ZK brokers using the deprecated 
> controller RPCs (LeaderAndIsr, UpdateMetadata, etc). 
>  
> A race condition exists where the KRaft controller will get stuck in a retry 
> loop while initializing itself after a failover which prevents it from 
> sending these RPCs to ZK brokers.
> h2. Impact
> Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK 
> brokers will not receive any metadata updates. The ZK brokers will be able to 
> send requests to the controller (such as AlterPartitions), but the metadata 
> updates which come as a result of those requests will never be seen. This 
> essentially looks like the controller is unavailable from the ZK brokers 
> perspective.
> h2. Detection and Mitigation
> This bug can be seen by observing failed ZK writes from a recently elected 
> controller.
> The tell-tale error message is:
> {code:java}
> Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This 
> indicates that another KRaft controller is making writes to ZooKeeper. {code}
> with a stacktrace like:
> {noformat}
> java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. 
> Expected zkVersion = 507823. This indicates that another KRaft controller is 
> making writes to ZooKeeper.
>   at 
> kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613)
>   at 
> kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639)
>   at 
> kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664)
>   at 
> scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
>   at 
> scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
>   at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
>   at 
> kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664)
>   at 
> kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158)
>   at 
> kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438)
>   at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>   at java.base/java.lang.Thread.run(Thread.java:1583)
>   at 
> org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat}
> To mitigate this problem, a new KRaft controller should be elected. This can 
> be done by restarting the problematic active controller. To verify that the 
> new c

[jira] [Updated] (KAFKA-16139) StreamsUpgradeTest fails consistently in 3.7.0

2024-03-13 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16139:
--
Fix Version/s: 3.6.2
   (was: 3.6.1)

> StreamsUpgradeTest fails consistently in 3.7.0
> --
>
> Key: KAFKA-16139
> URL: https://issues.apache.org/jira/browse/KAFKA-16139
> Project: Kafka
>  Issue Type: Test
>  Components: streams, system tests
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.7.0, 3.6.2
>
>
> h1. 
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest#test_rolling_upgrade_with_2_bouncesArguments:\{
>  “from_version”: “3.5.1”, “to_version”: “3.7.0-SNAPSHOT”}
>  
> {{TimeoutError('Could not detect Kafka Streams version 3.7.0-SNAPSHOT on 
> ubuntu@worker2')}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15817) Avoid reconnecting to the same IP address if multiple addresses are available

2024-03-13 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-15817:
--
Fix Version/s: 3.6.2

> Avoid reconnecting to the same IP address if multiple addresses are available
> -
>
> Key: KAFKA-15817
> URL: https://issues.apache.org/jira/browse/KAFKA-15817
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1
>Reporter: Bob Barrett
>Assignee: Bob Barrett
>Priority: Major
> Fix For: 3.7.0, 3.6.2
>
>
> In https://issues.apache.org/jira/browse/KAFKA-12193, we changed the DNS 
> resolution behavior for clients to re-resolve DNS after disconnecting from a 
> broker, rather than wait until we iterated over all addresses from a given 
> resolution. This is useful when the IP addresses have changed between the 
> connection and disconnection.
> However, with the behavior change, this does mean that clients could 
> potentially reconnect immediately to the same IP they just disconnected from, 
> if the IPs have not changed. In cases where the disconnection happened 
> because that IP was unhealthy (such as a case where a load balancer has 
> instances in multiple availability zones and one zone is unhealthy, or a case 
> where an intermediate component in the network path is going through a 
> rolling restart), this will delay the client successfully reconnecting. To 
> address this, clients should remember the IP they just disconnected from and 
> skip that IP when reconnecting, as long as the address resolved to multiple 
> addresses.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16171) Controller failover during ZK migration can prevent metadata updates to ZK brokers

2024-03-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16171:
-
Fix Version/s: 3.6.2

> Controller failover during ZK migration can prevent metadata updates to ZK 
> brokers
> --
>
> Key: KAFKA-16171
> URL: https://issues.apache.org/jira/browse/KAFKA-16171
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft, migration
>Affects Versions: 3.6.0, 3.7.0, 3.6.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.7.0, 3.6.2
>
>
> h2. Description
> During the ZK migration, after KRaft becomes the active controller we enter a 
> state called hybrid mode. This means we have a mixture of ZK and KRaft 
> brokers. The KRaft controller updates the ZK brokers using the deprecated 
> controller RPCs (LeaderAndIsr, UpdateMetadata, etc). 
>  
> A race condition exists where the KRaft controller will get stuck in a retry 
> loop while initializing itself after a failover which prevents it from 
> sending these RPCs to ZK brokers.
> h2. Impact
> Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK 
> brokers will not receive any metadata updates. The ZK brokers will be able to 
> send requests to the controller (such as AlterPartitions), but the metadata 
> updates which come as a result of those requests will never be seen. This 
> essentially looks like the controller is unavailable from the ZK brokers 
> perspective.
> h2. Detection and Mitigation
> This bug can be seen by observing failed ZK writes from a recently elected 
> controller.
> The tell-tale error message is:
> {code:java}
> Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This 
> indicates that another KRaft controller is making writes to ZooKeeper. {code}
> with a stacktrace like:
> {noformat}
> java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. 
> Expected zkVersion = 507823. This indicates that another KRaft controller is 
> making writes to ZooKeeper.
>   at 
> kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613)
>   at 
> kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639)
>   at 
> kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664)
>   at 
> scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
>   at 
> scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
>   at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
>   at 
> kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664)
>   at 
> kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158)
>   at 
> kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438)
>   at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>   at java.base/java.lang.Thread.run(Thread.java:1583)
>   at 
> org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat}
> To mitigate this problem, a new KRaft controller should be elected. This can 
> be done by restarting the problematic active controller. To verify that the 
> new 

  1   2   >