Re: [PR] KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test [kafka]
kamalcph commented on PR #15523: URL: https://github.com/apache/kafka/pull/15523#issuecomment-1996675858 Thanks for the review! > If the cleaner thread wasn't even started, are our really testing correct behaviour? For example, when we validate that cleaner shouldn't have deleted something, we would happily think that cleaner ran but didn't delete stuff whereas in reality, it may never have run at all. Should we instead add a wait loop in our setup for cache in the test to ensure that cleaner starts running? The RemoteIndexCache (RIC) and RemoteIndexCacheTest (RICT) are in different packages, so we cannot introduce a new package-private method in the RIC to overwrite it in RICT to ensure that the cleaner-thread started. Also, the ShutdownableThread is widely used by various module, so not changing the behavior of the thread. 1. The ShutdownableThread source code is in Java, so thought to write the new unit tests in java. 2. In this test, we don't expect any entries to expire. The test fails 2% of time which mean the remaining 98% of time, the cleaner-thread ran. In the assertion, we ensure that the cleaner-thread is closing correctly for both the cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
artemlivshits commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1524022035 ## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ## @@ -109,23 +109,30 @@ object TransactionMarkerChannelManager { } -class TxnMarkerQueue(@volatile var destination: Node) { +class TxnMarkerQueue(@volatile var destination: Node) extends Logging { // keep track of the requests per txn topic partition so we can easily clear the queue // during partition emigration - private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[TxnIdAndMarkerEntry]]().asScala + private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala - def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[TxnIdAndMarkerEntry]] = { + def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = { markersPerTxnTopicPartition.remove(partition) } - def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): Unit = { -val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, -new LinkedBlockingQueue[TxnIdAndMarkerEntry]()) -queue.add(txnIdAndMarker) + def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: PendingCompleteTxnAndMarkerEntry): Unit = { +val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, { + info(s"Creating new marker queue for txn partition $txnTopicPartition to destination broker ${destination.id}") + new LinkedBlockingQueue[PendingCompleteTxnAndMarkerEntry]() +}) +queue.add(pendingCompleteTxnAndMarker) + +if (markersPerTxnTopicPartition.get(txnTopicPartition).orNull != queue) { + // This could happen if the queue got removed concurrently. Review Comment: As far as I can see, it shouldn't affect the user visible behavior. It does create an interesting state when the queue is removed in removeMarkersForTxnTopicPartition -- we could have: 1. [addMarkers] Retrieve queue. 2. [removeMarkersForTxnTopicPartition] Remove queue. 3. [removeMarkersForTxnTopicPartition] Iterate over queue, but not removeMarkersForTxn because queue is empty. 4. [addMarkers] Add markers to the queue. Now we've effectively removed the markers while transactionsWithPendingMarkers has an entry. This state could last for a while if the removal happened on unload (and technically the txn id could expire or etc. so this state may stay indefinitely until broker restart), but as soon as real workflow happens on this txn id that sends out markers, the proper entry will be created and the actual functionality will work as expected. In other words, this race can lead to an orphan entry in transactionsWithPendingMarkers, but it doesn't affect anything (other than leaking a small amount of memory) until the markers are sent, and sending markers will fix it. ## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala: ## @@ -90,9 +90,10 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse] val responseErrors = writeTxnMarkerResponse.errorsByProducerId - for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) { Review Comment: I agree this code could benefit from some refactor, we should probably structure it so that instead of branching on wasDisconnected at the top, it should just iterate over the pending entries and check wasDisconnected in specific cases. But I think it should be done separately, as this change is fairly mechanical, that simplifies the review of what it does. ## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ## @@ -109,23 +109,30 @@ object TransactionMarkerChannelManager { } -class TxnMarkerQueue(@volatile var destination: Node) { +class TxnMarkerQueue(@volatile var destination: Node) extends Logging { // keep track of the requests per txn topic partition so we can easily clear the queue // during partition emigration - private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[TxnIdAndMarkerEntry]]().asScala + private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala - def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[TxnIdAndMarkerEntry]] = { + def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = { markersPerTxnTopicPartition.remove(partition) } - def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): Unit
Re: [PR] MINOR: Kafka Streams docs fixes [kafka]
mjsax merged PR #15517: URL: https://github.com/apache/kafka/pull/15517 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: simplify consumer logic [kafka]
mjsax merged PR #15519: URL: https://github.com/apache/kafka/pull/15519 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15302: Stale value returned when using store.all() with key deletion [docs] [kafka]
mjsax commented on code in PR #15495: URL: https://github.com/apache/kafka/pull/15495#discussion_r1524038413 ## docs/streams/developer-guide/memory-mgmt.html: ## @@ -151,6 +151,10 @@ Serdes.Long()) .withCachingEnabled(); Record caches are not supported for versioned state stores. + Caution: When using withCachingEnabled(), +if you delete() a key from the Store while iterating(e.g., KeyValueIterator) through the keys, +the value of the key that hasn't been flushed from the cache may be returned as a stale value. +Therefore, when deleting, you must flush() the cache before the iterator. Review Comment: I would rephrase this sentenc. ``` To avoid reading stale data, you can `flush()` the store before creating the iterator. Note, that flushing too often can lead to performance degration if RocksDB is used, so we advice to avoid flushing manually in general. ## docs/streams/developer-guide/memory-mgmt.html: ## @@ -151,6 +151,10 @@ Serdes.Long()) .withCachingEnabled(); Record caches are not supported for versioned state stores. + Caution: When using withCachingEnabled(), +if you delete() a key from the Store while iterating(e.g., KeyValueIterator) through the keys, Review Comment: Cannot remember from the ticket, but was this limited to deletes, or also updates? ## docs/streams/developer-guide/memory-mgmt.html: ## @@ -151,6 +151,10 @@ Serdes.Long()) .withCachingEnabled(); Record caches are not supported for versioned state stores. + Caution: When using withCachingEnabled(), +if you delete() a key from the Store while iterating(e.g., KeyValueIterator) through the keys, Review Comment: ```suggestion if you delete() a key from the store while iterating (e.g., KeyValueIterator) through the keys, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix descriptions of raft-metrics in docs/ops.html [kafka]
github-actions[bot] commented on PR #14973: URL: https://github.com/apache/kafka/pull/14973#issuecomment-1996325738 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix so that a partition is retained, if the another parititon on same… [kafka]
github-actions[bot] commented on PR #15019: URL: https://github.com/apache/kafka/pull/15019#issuecomment-1996325711 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1524147955 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1524143322 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { Review Comment: Sure, sounds good to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
chia7712 commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1524140865 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { Review Comment: > I like the idea of migrating newArrayBacked(int maxLength) to newLinkedListBacked(int maxLength), this would also help cut down on array list resizing time, as currently this constructor creates a default array list, which is size 10 I did not suggest to use Linked list for current usage 🫢 My point was keeping the constructor can have flexibility of changing the list implementation in the future We need to consider the cost of iteration before adopting the linked list. To simplify this PR and address the origin propose, changing the scope from public to private is good enough. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on PR #15507: URL: https://github.com/apache/kafka/pull/15507#issuecomment-1996246082 @chia7712 @jolshan Pushed an update based on discussion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1524100301 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { Review Comment: I like the idea of migrating `newArrayBacked(int maxLength`) to `newLinkedListBacked(int maxLength)`, this would also help cut down on array list resizing time, as currently this constructor creates a default array list, which is size 10 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
chia7712 commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1524078277 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { Review Comment: 1. maybe `private` is enough. the another view is that we can have `newListBacked` for more flexible usage in the future, and the implementation is based on the constructor taking a passed list. ```java public static BoundedList newListBacked(int maxLength) { return new BoundedList<>(maxLength, new LinkedList<>()); } ``` 2. "ArrayList resizing operations don't seem to be a concern to people." -> caller can fix it by defining "initialCapacity" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1524075123 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { Review Comment: I think so. It looks like the consensus is: 1. Remove constructor that takes in a list. 2. Keep both versions of `newArrayBacked`, as ArrayList resizing operations don't seem to be a concern to people. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1524073993 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { -return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity)); -} - -public BoundedList(int maxLength, List underlying) { +public BoundedList(int maxLength) { if (maxLength <= 0) { throw new IllegalArgumentException("Invalid non-positive maxLength of " + maxLength); } this.maxLength = maxLength; -if (underlying.size() > maxLength) { -throw new BoundedListTooLongException("Cannot wrap list, because it is longer than " + -"the maximum length " + maxLength); -} -this.underlying = underlying; +this.underlying = new ArrayList<>(maxLength); Review Comment: Got it, so we can keep the existing `newArrayBacked()` constructors and get rid of the constructor that takes in a list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
jolshan commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1524070820 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { -return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity)); -} - -public BoundedList(int maxLength, List underlying) { +public BoundedList(int maxLength) { if (maxLength <= 0) { throw new IllegalArgumentException("Invalid non-positive maxLength of " + maxLength); } this.maxLength = maxLength; -if (underlying.size() > maxLength) { -throw new BoundedListTooLongException("Cannot wrap list, because it is longer than " + -"the maximum length " + maxLength); -} -this.underlying = underlying; +this.underlying = new ArrayList<>(maxLength); Review Comment: Yeah, it seems like we want to make the constructor private and use the newArrayBacked methods to accomplish what we want. I think we can all agree directly passing in the underlying arraylist is not what we want 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
jolshan commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1524069898 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { Review Comment: My understanding here is that we do want to know an expected size to start out with, but we shouldn't necessarily link it directly to an array we are passing in as the constructor does. Does that mean the deleted code here is actually the behavior we want? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1524067155 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,236 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false + val mockTime: Time = new MockTime(1) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages() +createTopicWithConfig(topicName, new Properties()) adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } + override def brokerTime(brokerId: Int): Time = mockTime + @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch("gzip") +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testLatestOffset(quorum: String): Unit = { -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) -assertEquals(3, latestOffset.offset()) + def testThreeRecordsInSeparateBatch(quorum: String): Unit = { +produceMessagesInSeparateBatch() +verifyListOffsets() Review Comment: It's because the fix about "returning the first offset when multi-records having the same maxTimestamp" for "non-compressed" records are in this PR: https://github.com/apache/kafka/pull/15476 . we should add the logAppendTime test there. :) cc @johnnychhsu ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,236 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false + val mockTime: Time = new MockTime(1) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1524067155 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,236 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false + val mockTime: Time = new MockTime(1) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages() +createTopicWithConfig(topicName, new Properties()) adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } + override def brokerTime(brokerId: Int): Time = mockTime + @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch("gzip") +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testLatestOffset(quorum: String): Unit = { -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) -assertEquals(3, latestOffset.offset()) + def testThreeRecordsInSeparateBatch(quorum: String): Unit = { +produceMessagesInSeparateBatch() +verifyListOffsets() Review Comment: It's because the fix about "returning the first offset when multi-records having the same maxTimestamp" for "non-compressed" records are in this PR: https://github.com/apache/kafka/pull/15476 . We should add tests there. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
chia7712 commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1524061026 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { -return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity)); -} - -public BoundedList(int maxLength, List underlying) { +public BoundedList(int maxLength) { if (maxLength <= 0) { throw new IllegalArgumentException("Invalid non-positive maxLength of " + maxLength); } this.maxLength = maxLength; -if (underlying.size() > maxLength) { -throw new BoundedListTooLongException("Cannot wrap list, because it is longer than " + -"the maximum length " + maxLength); -} -this.underlying = underlying; +this.underlying = new ArrayList<>(maxLength); Review Comment: Changing the access scope is to fix the issue you described: > This is problematic because the the constructor created the BoundedList using a reference to underlying, not a copy of it; therefore, a user could add elements to underlying instead of their newly instantiated BoundedList and force the BoundedList to be larger than its maxLength`. We should avoid it from using in code and so the `private` is good practice. All callers should use `newArrayBacked` to create `BoundedList` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
chia7712 commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1524056640 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { Review Comment: > I'm not sure what you mean, could you please elaborate? sorry for unclear comment. It seems to me "initialCapacity" is still useful if we "expect" the size of batch could be large. For example: create/delete a bunch of topics - that is a common usage. For such use cases, it would be better to set the initial capacity to avoid the resize problem you described. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1524044954 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.test.MockApiProcessor; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; + +public class KStreamKStreamWindowCloseTest { + +private static final String LEFT = "left"; +private static final String RIGHT = "right"; +private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private static final Consumed CONSUMED = Consumed.with(Serdes.Integer(), Serdes.String()); +private static final JoinWindows WINDOW = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)); + +static List streams() { +return Arrays.asList( +innerJoin(), +leftJoin(), +outerJoin() +); +} + +private static Arguments innerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).join( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments leftJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).leftJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments outerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).outerJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream.process(supplier); +return Arguments.of(builder.build(PROPS), supplier); +} + +@ParameterizedTest +@MethodSource("streams") +public void recordsArrivingPostWindowCloseShouldBeDropped( +final Topology topology, +final MockApiProcessorSupplier supplier) { +
Re: [PR] MINOR; Missing minISR config should log a debug message [kafka]
CalvinConfluent commented on code in PR #15529: URL: https://github.com/apache/kafka/pull/15529#discussion_r1523997343 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2189,7 +2189,7 @@ int getTopicEffectiveMinIsr(String topicName) { if (minIsrConfig != null) { currentMinIsr = Integer.parseInt(minIsrConfig); } else { -log.warn("Can't find the min isr config for topic: " + topicName + " using default value " + defaultMinIsr); +log.debug("Can't find the min isr config for topic: " + topicName + " using default value " + defaultMinIsr); Review Comment: Make sense, thanks! Removed the error message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
artemlivshits commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1523996339 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2179,144 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { +// If the previous cursor points to the partition 0, the cursor will not be set as the first one +// in the topic list should be the previous cursor topic. +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); +// The topicDescription for the cursor topic of the current batch. +TopicDescription nextTopicDescription = null; + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); + +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +pendingTopics.remove(topicName); +if (responseCursor != null && responseCursor.topicName().equals(topicName)) { +responseCursor = null; +} +continue; +} + +TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + +if (partiallyFinished
[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective
[ https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826885#comment-17826885 ] Apoorv Mittal commented on KAFKA-16359: --- [~ijuma] here is the fix [https://github.com/apache/kafka/pull/15532], the PR description explains why we require code from plugin extracted. > kafka-clients-3.7.0.jar published to Maven Central is defective > --- > > Key: KAFKA-16359 > URL: https://issues.apache.org/jira/browse/KAFKA-16359 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Jeremy Norris >Assignee: Apoorv Mittal >Priority: Critical > > The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is > defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} > element: > {code} > Manifest-Version: 1.0 > Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10 > .5.jar slf4j-api-1.7.36.jar > {code} > This bogus {{Class-Path}} element leads to compiler warnings for projects > that utilize it as a dependency: > {code} > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar": > no such file or directory > {code} > Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published > without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or > a new release should be published that corrects this defect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16359: Corrected manifest file for kafka-clients [kafka]
apoorvmittal10 opened a new pull request, #15532: URL: https://github.com/apache/kafka/pull/15532 The issue [KAFKA-16359](https://issues.apache.org/jira/browse/KAFKA-16359) reported inclusion of `kafka-clients` runtime dependencies in MANIFEST.MF `Class-Path`. The root cause is the issue defined [here](https://github.com/johnrengelman/shadow/issues/324) with the usage of `shadow` plugin. Looking into the specifics of plugin and [documentation](https://imperceptiblethoughts.com/shadow/configuration/#configuring-the-runtime-classpath), specifies that any dependency marked as `shadow` will be treated as following by the shadow plugin: 1. Adds the dependency as runtime dependency in resultant pom.xml - [code here](https://github.com/johnrengelman/shadow/blob/1d647471b9130d35b17771a50be382f6cb442d71/src/main/groovy/com/github/jengelman/gradle/plugins/shadow/ShadowExtension.groovy#L32) 2. Adds the dependency as `Class-Path` in `MANIFEST.MF` as well - [code here](https://github.com/johnrengelman/shadow/blob/1d647471b9130d35b17771a50be382f6cb442d71/src/main/groovy/com/github/jengelman/gradle/plugins/shadow/ShadowJavaPlugin.groovy#L71) ### Resolution We do need the runtime dependencies available in the pom (1 above) but not on manifest (2 above). Also there is no clear way to separate the behaviour as both above tasks relies on `shadow` configuration. To fix, I have defined another custom configuration named `shadowed` which is later used to populate the correct pom (the [code is similar to what shadow plugin does](https://github.com/johnrengelman/shadow/blob/1d647471b9130d35b17771a50be382f6cb442d71/src/main/groovy/com/github/jengelman/gradle/plugins/shadow/ShadowExtension.groovy#L32) to populate pom for runtime dependencies). Though this might seem like a workaround, but I think that's the only way to fix the issue. I have checked other SDKs which suffered with same issue and went with similar route to populate pom. ### Verification 1. Exploded jar by `jar -xvf kafka-clientsjar` and compared pre and post manifest contents. ![Screenshot 2024-03-13 at 9 37 16 PM](https://github.com/apache/kafka/assets/2861565/bffa07f1-f8a6-4c77-a4e0-3b38c0968fc7) 2. Verified pre and post resultant pom. ![Screenshot 2024-03-13 at 9 39 08 PM](https://github.com/apache/kafka/assets/2861565/edf7c936-eaab-4072-91dc-f0827cf1f5af) 3. Verified pre and post jar tf output `jar -tf kafka-clients-3.8.0-SNAPSHOT.jar > jar_tf_output` ![Screenshot 2024-03-13 at 9 38 48 PM](https://github.com/apache/kafka/assets/2861565/147a8570-0295-4edb-91a0-ddc7dab1e659) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1523971792 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; +TopicDescription nextTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { Review Comment: Correct, updated the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]
jsancio commented on code in PR #15478: URL: https://github.com/apache/kafka/pull/15478#discussion_r1523954492 ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -348,14 +357,15 @@ final class KafkaMetadataLog private ( snapshotId.offset <= latestSnapshotId.offset && log.maybeIncrementLogStartOffset(snapshotId.offset, LogStartOffsetIncrementReason.SnapshotGenerated) => // Delete all segments that have a "last offset" less than the log start offset -log.deleteOldSegments() +val deletedSegments = log.deleteOldSegments() // Remove older snapshots from the snapshots cache -(true, forgetSnapshotsBefore(snapshotId)) +val forgottenSnapshots = forgetSnapshotsBefore(snapshotId) +(deletedSegments != 0 || forgottenSnapshots.nonEmpty, forgottenSnapshots) Review Comment: The old code was probably correct in practice. I just wanted to make clearer that either one could be true for this function to return true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]
jsancio commented on code in PR #15478: URL: https://github.com/apache/kafka/pull/15478#discussion_r1523951275 ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -404,21 +414,33 @@ final class KafkaMetadataLog private ( * all cases. * * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted. + * The predicate returns a Some with the reason if the snapshot should be deleted and a None if the snapshot + * should not be deleted */ - private def cleanSnapshots(predicate: OffsetAndEpoch => Boolean): Boolean = { -if (snapshots.size < 2) + private def cleanSnapshots(predicate: OffsetAndEpoch => Option[SnapshotDeletionReason]): Boolean = { +if (snapshots.size < 2) { return false +} var didClean = false snapshots.keys.toSeq.sliding(2).foreach { case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) => -if (predicate(snapshot) && deleteBeforeSnapshot(nextSnapshot)) { - didClean = true -} else { - return didClean +predicate(snapshot) match { + case Some(reason) => +if (deleteBeforeSnapshot(nextSnapshot, reason)) { Review Comment: Yes. `&&` only evaluates the right argument if the left argument is `true`. Which is what we want because we don't want to delete the snapshot if the predicate is false. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Make string from array [kafka]
jsancio merged PR #15526: URL: https://github.com/apache/kafka/pull/15526 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
jolshan commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1523937621 ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -557,6 +557,7 @@ class TransactionStateManager(brokerId: Int, loadingPartitions.remove(partitionAndLeaderEpoch) transactionsPendingForCompletion.foreach { txnTransitMetadata => +info(s"Sending txn markers for $txnTransitMetadata after loading partition $partitionId") Review Comment: I appreciate this log as well, but I wonder if we could combine it into a single log message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
jolshan commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1523936892 ## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala: ## @@ -90,9 +90,10 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse] val responseErrors = writeTxnMarkerResponse.errorsByProducerId - for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) { Review Comment: again -- not related to the code changes, but there is a lot of duplicated code between the disconnected and the response cases. Most is the same for the state errors cases like coordinator_load_in_progress etc. I wonder if we could extract that out and put it above... 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
jolshan commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1523931941 ## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala: ## @@ -39,22 +39,23 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, if (response.wasDisconnected) { trace(s"Cancelled request with header $requestHeader due to node ${response.destination} being disconnected") - for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) { -val transactionalId = txnIdAndMarker.txnId -val txnMarker = txnIdAndMarker.txnMarkerEntry + for (pendingCompleteTxnAndMarker <- pendingCompleteTxnAndMarkerEntries.asScala) { +val pendingCompleteTxn = pendingCompleteTxnAndMarker.pendingCompleteTxn +val transactionalId = pendingCompleteTxn.transactionalId +val txnMarker = pendingCompleteTxnAndMarker.txnMarkerEntry txnStateManager.getTransactionState(transactionalId) match { case Left(Errors.NOT_COORDINATOR) => info(s"I am no longer the coordinator for $transactionalId; cancel sending transaction markers $txnMarker to the brokers") -txnMarkerChannelManager.removeMarkersForTxnId(transactionalId) Review Comment: unrelated to the changes here, but the first person ("I am") logging here is somewhat strange 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on PR #15511: URL: https://github.com/apache/kafka/pull/15511#issuecomment-1995835817 hey @lucasbru, I took another full pass, left a few comments. Thanks for the changes! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
jolshan commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1523916766 ## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ## @@ -419,25 +432,34 @@ class TransactionMarkerChannelManager( def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = { markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue => - for (entry: TxnIdAndMarkerEntry <- queue.asScala) -removeMarkersForTxnId(entry.txnId) + for (entry <- queue.asScala) { Review Comment: Do we have an idea for how many inflight markers we may have at once? Just want to make sure this log message isn't too spammy. I wonder if we could log all the markers in a single log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523858796 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -889,43 +914,36 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { -log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + +log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { -log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + +log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); +final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -SortedSet ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); -ownedPartitions.addAll(subscriptions.assignedPartitions()); - -// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are -// being reconciled. Needed for interactions with the centralized subscription state that -// does not support topic IDs yet, and for the callbacks. -SortedSet assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions); - -// Check same assignment. Based on topic names for now, until topic IDs are properly -// supported in the centralized subscription state object. Note that this check is -// required to make sure that reconciliation is not triggered if the assignment ready to -// be reconciled is the same as the current one (even though the member may remain -// in RECONCILING state if it has some unresolved assignments). -boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions); - -if (sameAssignmentReceived) { +if (resolvedAssignment.equals(currentAssignment)) { Review Comment: well I was expecting we only need to trigger the callbacks if the assignment changed (could be to empty, but something needs to change), and that's not the case if the member ends up with t1-1 again, that it already owns. By running a full reconciliation when the the resolved assignment is the same as the current but received later, we end up with a client reconciling the exact same assignment it already owns :S It would turn out noisy I expect, accounting for 2 rebalances in cases probably much more common, where a new topic assigned is temporarily not in metadata and then discovered: member owns [t1-1], receives assignment [t1-1, t2-1] with missing metadata for t2 (t2 discovered shortly after). Wouldn't we generate 2 rebalances??? (a 1st one with no changes in assignment, a 2nd one with the added topic once discovered) when truly things only changed once) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1523909237 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { Review Comment: I'm not sure what you mean, could you please elaborate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1523907637 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { -return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity)); -} - -public BoundedList(int maxLength, List underlying) { +public BoundedList(int maxLength) { if (maxLength <= 0) { throw new IllegalArgumentException("Invalid non-positive maxLength of " + maxLength); } this.maxLength = maxLength; -if (underlying.size() > maxLength) { -throw new BoundedListTooLongException("Cannot wrap list, because it is longer than " + -"the maximum length " + maxLength); -} -this.underlying = underlying; +this.underlying = new ArrayList<>(maxLength); Review Comment: Sorry I am confused on what you want to change to `private`? Today, passing in the source list is only used in unit tests, not any source code. If it's only used for unit tests I think it should be removed so that tests interact with the BoundedList the same way the source code does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
jolshan commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1523907297 ## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ## @@ -109,23 +109,30 @@ object TransactionMarkerChannelManager { } -class TxnMarkerQueue(@volatile var destination: Node) { +class TxnMarkerQueue(@volatile var destination: Node) extends Logging { // keep track of the requests per txn topic partition so we can easily clear the queue // during partition emigration - private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[TxnIdAndMarkerEntry]]().asScala + private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala - def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[TxnIdAndMarkerEntry]] = { + def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = { markersPerTxnTopicPartition.remove(partition) } - def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): Unit = { -val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, -new LinkedBlockingQueue[TxnIdAndMarkerEntry]()) -queue.add(txnIdAndMarker) + def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: PendingCompleteTxnAndMarkerEntry): Unit = { +val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, { + info(s"Creating new marker queue for txn partition $txnTopicPartition to destination broker ${destination.id}") Review Comment: I noticed in the doc: ``` `createValue` may be invoked more than once if multiple threads attempt to insert a key at the same time ``` This seems ok, but just wanted to call it out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523903844 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -1952,19 +1942,22 @@ private void assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm // Should reset epoch to leave the group and release the assignment (right away because // there is no onPartitionsLost callback defined) verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); -assertTrue(membershipManager.currentAssignment().isEmpty()); +assertTrue(membershipManager.currentAssignment().isNone()); assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty()); assertEquals(LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch()); } @Test public void testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() { Review Comment: I cannot find any test for one of the 2 issues we're after with this PR: ensure that we are triggering the callbacks when joining and getting empty assignment. Could we add it? I expect it should be very similar to this one, but just providing a `CounterConsumerRebalanceListener` and asserting that it called the `onPartitionsAssigned` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523885245 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining +if (membershipManager.memberEpoch() == 0) { data.setRebalanceTimeoutMs(rebalanceTimeoutMs); -sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs; } if (!this.subscriptions.hasPatternSubscription()) { -// SubscribedTopicNames - only sent if has changed since the last heartbeat +// SubscribedTopicNames - only sent when joining or if has changed since the last heartbeat Review Comment: inherited, but now that we're here: "or if **it** has changed..." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523863939 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -683,7 +681,10 @@ public void testDelayedMetadataUsedToCompleteAssignment() { receiveAssignment(newAssignment, membershipManager); membershipManager.poll(time.milliseconds()); -verifyReconciliationNotTriggered(membershipManager); +// We bumped the local epoch, so new reconciliation is triggered Review Comment: This new reconciliation triggered is an example of what I was referring to in the [comment](https://github.com/apache/kafka/pull/15511/files#r1523858796) above, that seems to complicate the flow with 2 rebalances instead of 1. I wouldn't expect a member rebalancing/reconciling at this point (no assignment change for him yet, t2 not in metadata), and then reconciling/rebalance again once it discovers it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523880045 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -508,9 +508,30 @@ private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign */ private void replaceTargetAssignmentWithNewAssignment( ConsumerGroupHeartbeatResponseData.Assignment assignment) { -currentTargetAssignment.clear(); + +// Return if the same as current assignment; comparison without creating a new collection +if (currentTargetAssignment != null) { +// check if the new assignment is different from the current target assignment +if (currentTargetAssignment.partitions.size() == assignment.topicPartitions().size() && +assignment.topicPartitions().stream().allMatch( +tp -> currentTargetAssignment.partitions.containsKey(tp.topicId()) && + currentTargetAssignment.partitions.get(tp.topicId()).size() == tp.partitions().size() && + currentTargetAssignment.partitions.get(tp.topicId()).containsAll(tp.partitions( { +return; +} +} + +// Bump local epoch and replace assignment +long nextLocalEpoch; +if (currentTargetAssignment == null) { +nextLocalEpoch = 0; +} else { +nextLocalEpoch = currentTargetAssignment.localEpoch + 1; +} Review Comment: I was wrong thinking that we could reuse the server epoch here, but it's not truly representing new assignments for the member, and we could end up receiving multiple new assignments in the same server epoch. So ok with this local epoch approach, best way to keep track of "versions" of the assignment received. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523863939 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -683,7 +681,10 @@ public void testDelayedMetadataUsedToCompleteAssignment() { receiveAssignment(newAssignment, membershipManager); membershipManager.poll(time.milliseconds()); -verifyReconciliationNotTriggered(membershipManager); +// We bumped the local epoch, so new reconciliation is triggered Review Comment: This new reconciliation triggered is an example of what I was referring to in the [comment](https://github.com/apache/kafka/pull/15511/files#r1523858796) above, that seems to complicate the flow with 2 rebalances instead of 1. I wouldn't expect a member rebalancing/reconciling at this point (nothing changed for him yet, t2 not in metadata), and then reconciling/rebalance again once it discovers it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523858796 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -889,43 +914,36 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { -log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + +log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { -log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + +log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); +final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -SortedSet ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); -ownedPartitions.addAll(subscriptions.assignedPartitions()); - -// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are -// being reconciled. Needed for interactions with the centralized subscription state that -// does not support topic IDs yet, and for the callbacks. -SortedSet assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions); - -// Check same assignment. Based on topic names for now, until topic IDs are properly -// supported in the centralized subscription state object. Note that this check is -// required to make sure that reconciliation is not triggered if the assignment ready to -// be reconciled is the same as the current one (even though the member may remain -// in RECONCILING state if it has some unresolved assignments). -boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions); - -if (sameAssignmentReceived) { +if (resolvedAssignment.equals(currentAssignment)) { Review Comment: well I was expecting we only need to trigger the callbacks if the assignment changed (could be to empty, but something needs to change), and that's not the case if the member ends up with t1-1 again, that it already owns. By running a full reconciliation when the the resolved assignment is the same as the current but received later, we end up with a client reconciling the exact same assignment it already owns :S It would turn out noisy I expect, accounting for 2 rebalances in cases probably much more common, where a new topic assigned is temporarily not in metadata and then discovered: member owns [t1-1], receives assignment [t1-1, t2-1] with missing metadata for t2 (t2 discovered shortly after). We would generate 2 rebalances (a 1st one with no changes in assignment, a 2nd one with the added topic once discovered) when truly things only changed once). Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Change link to deprecated class to substitute class [kafka]
cadonna opened a new pull request, #15531: URL: https://github.com/apache/kafka/pull/15531 The docs for Interactive Queries contain a link to deprecated class StreamsMetadata in package org.apache.kafka.streams.state [1]. This commit changed that link to class StreamsMetadata in package org.apache.kafka.streams [2] that replaces the deprecated class. [1] https://kafka.apache.org/37/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html [2] https://kafka.apache.org/37/javadoc/org/apache/kafka/streams/StreamsMetadata.html *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]
OmniaGM commented on code in PR #15506: URL: https://github.com/apache/kafka/pull/15506#discussion_r1523840166 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java: ## @@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); -try { -workerTask.execute(); -fail("workerTask.execute should have thrown an exception"); -} catch (ConnectException e) { -assertSame("Exception from put should be the cause", putException, e.getCause()); -assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0); -assertSame(closeException, e.getSuppressed()[0]); -} + +Throwable thrownException = assertThrows(ConnectException.class, () -> workerTask.execute()); +assertSame("Exception from put should be the cause", putException, thrownException.getCause()); Review Comment: LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826844#comment-17826844 ] Phuc Hong Tran commented on KAFKA-15538: [~lianetm] Sorry for the delay, I’ll try to get it done within this week. > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Blocker > Labels: kip-848-client-support, newbie, regex > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. > As part of this task, we should re-enable all integration tests defined in > the PlainTextAsyncConsumer that relate to subscription with pattern and that > are currently disabled for the new consumer + new protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
mjsax commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1523802443 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Good point about async window. Right now, the `KStreamKStreamJoinProcessor` does not know if it's a left-outer or full-outer join. However, we know inside `KStreamImplJoin` which is the only place in which we create `KStreamKStreamJoin`, so it should be easy to pass in this information into the Processor :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Kafka Streams docs fixes [kafka]
mjsax commented on code in PR #15517: URL: https://github.com/apache/kafka/pull/15517#discussion_r1523775722 ## docs/streams/developer-guide/config-streams.html: ## @@ -97,6 +97,7 @@ Naming Default Values + Default Values Review Comment: Thanks. Great catch. C&p error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
gharris1727 commented on PR #15349: URL: https://github.com/apache/kafka/pull/15349#issuecomment-1995368843 @ijuma @mumrah @dajac @C0urante I'm still interested in having this change implemented, independent of hooking it into the CI build. PTAL, Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826827#comment-17826827 ] Edoardo Comar commented on KAFKA-16369: --- KafkaServer (ZKMode) needs to wait on the future returned by SocketServer.enableRequestProcessing similarly to the BrokerServer (KRaft mode). PR in progress > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Resolve SslContextFactory method deprecations [kafka]
gharris1727 merged PR #15468: URL: https://github.com/apache/kafka/pull/15468 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use [kafka]
edoardocomar opened a new pull request, #15530: URL: https://github.com/apache/kafka/pull/15530 Add a Wait for all the SocketServer ports to be open, and the Acceptors to be started The BrokerServer (KRaft mode) had such a wait, which was missing from the KafkaServer (ZK mode). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Resolve SslContextFactory method deprecations [kafka]
gharris1727 commented on PR #15468: URL: https://github.com/apache/kafka/pull/15468#issuecomment-1995349571 Test failures appear unrelated, and the connect tests pass for me locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523734711 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1497,4 +1502,93 @@ public PollResult poll(final long currentTimeMs) { List stateListeners() { return unmodifiableList(stateUpdatesListeners); } + +private final static class LocalAssignmentImpl implements LocalAssignment { + +private static final long NONE_EPOCH = -1; + +private static final LocalAssignmentImpl NONE = new LocalAssignmentImpl(NONE_EPOCH, Collections.emptyMap()); + +private final long localEpoch; + +private final Map> partitions; + +public LocalAssignmentImpl(long localEpoch, Map> partitions) { +this.localEpoch = localEpoch; +this.partitions = partitions; +if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) { +throw new IllegalArgumentException("Local epoch must be set if there are partitions"); +} +} + +public LocalAssignmentImpl(long localEpoch, SortedSet topicIdPartitions) { +this.localEpoch = localEpoch; +this.partitions = new HashMap<>(); +if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) { +throw new IllegalArgumentException("Local epoch must be set if there are partitions"); +} +topicIdPartitions.forEach(topicIdPartition -> { +Uuid topicId = topicIdPartition.topicId(); +partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition()); +}); +} + +@Override +public String toString() { +return "{" + +"localEpoch=" + localEpoch + +", partitions=" + partitions + +'}'; +} + +@Override +public boolean equals(final Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +final LocalAssignmentImpl that = (LocalAssignmentImpl) o; +return localEpoch == that.localEpoch && Objects.equals(partitions, that.partitions); +} + +@Override +public int hashCode() { +return Objects.hash(localEpoch, partitions); +} + +@Override +public Map> getPartitions() { +return partitions; +} + +@Override +public boolean isNone() { +return localEpoch == NONE_EPOCH; +} + +Optional updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) { + +// Return if we have an assignment, and it is the same as current assignment; comparison without creating a new collection +if (localEpoch != NONE_EPOCH) { +// check if the new assignment is different from the current target assignment Review Comment: nit: here we are checking if the new assignment is "the same as" the current (not diff)even maybe just remove this comment, as the one above seems enough? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
jolshan commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1523732734 ## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ## @@ -109,23 +109,30 @@ object TransactionMarkerChannelManager { } -class TxnMarkerQueue(@volatile var destination: Node) { +class TxnMarkerQueue(@volatile var destination: Node) extends Logging { // keep track of the requests per txn topic partition so we can easily clear the queue // during partition emigration - private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[TxnIdAndMarkerEntry]]().asScala + private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala - def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[TxnIdAndMarkerEntry]] = { + def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = { markersPerTxnTopicPartition.remove(partition) } - def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): Unit = { -val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, -new LinkedBlockingQueue[TxnIdAndMarkerEntry]()) -queue.add(txnIdAndMarker) + def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: PendingCompleteTxnAndMarkerEntry): Unit = { +val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, { + info(s"Creating new marker queue for txn partition $txnTopicPartition to destination broker ${destination.id}") + new LinkedBlockingQueue[PendingCompleteTxnAndMarkerEntry]() +}) +queue.add(pendingCompleteTxnAndMarker) + +if (markersPerTxnTopicPartition.get(txnTopicPartition).orNull != queue) { + // This could happen if the queue got removed concurrently. Review Comment: Should we do something if we added to a "dead queue"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1523717757 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1504,6 +1507,8 @@ public void handleResponse(AbstractResponse response) { fatalError(error.exception()); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { abortableError(GroupAuthorizationException.forGroupId(key)); +} else if (error == Errors.ABORTABLE_TRANSACTION_EXCEPTION) { Review Comment: The idea of this error is that we are future proofing. In the future if we want the producer to abort the transaction (say we have a new use case that requires this behavior) we can rely on this error to do the correct thing for older clients. (Or clients 3.8 and later in this case :) ) We ran into this issue a lot when picking error codes for kip-890 part 1. If we had such an error then, we could have used it for the old clients. Instead, we chosen INVALID_TXN_STATE which has inconsistent behavior across clients. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1523717757 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1504,6 +1507,8 @@ public void handleResponse(AbstractResponse response) { fatalError(error.exception()); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { abortableError(GroupAuthorizationException.forGroupId(key)); +} else if (error == Errors.ABORTABLE_TRANSACTION_EXCEPTION) { Review Comment: The idea of this error is that we are future proofing. In the future if we want the producer to abort the transaction (say we have a new use case that requires this behavior) we can rely on this error to do the correct thing for older clients. We ran into this issue a lot when picking error codes for kip-890 part 1. If we had such an error then, we could have used it for the old clients. Instead, we chosen INVALID_TXN_STATE which has inconsistent behavior across clients. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
CalvinConfluent commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1523705762 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1504,6 +1507,8 @@ public void handleResponse(AbstractResponse response) { fatalError(error.exception()); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { abortableError(GroupAuthorizationException.forGroupId(key)); +} else if (error == Errors.ABORTABLE_TRANSACTION_EXCEPTION) { Review Comment: Does FindCoordinatorRequest also return the ABORTABLE_TRANSACTION_EXCEPTION? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523709384 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -530,19 +530,12 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); Review Comment: yes, that's the current expectation, so looks good. Let's just update the comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
artemlivshits commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1523702932 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; +TopicDescription nextTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { Review Comment: Ideally we should drive the pagination from the cursor, rather than spreading the logic between multiple states. We plan to get rid of topic list eventually and this logic would be easy to miss. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toM
[jira] [Assigned] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-16369: - Assignee: Edoardo Comar > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-16369: -- Attachment: server.log kraft-server.log > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Reporter: Edoardo Comar >Priority: Major > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826811#comment-17826811 ] Edoardo Comar commented on KAFKA-16369: --- server logs as broker only (zk mode) and broker/controller (kraft mode) when port 9092 is already bound [^server.log][^kraft-server.log] > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Reporter: Edoardo Comar >Priority: Major > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826811#comment-17826811 ] Edoardo Comar edited comment on KAFKA-16369 at 3/13/24 5:50 PM: server logs as broker only (zk mode) and broker/controller (kraft mode) when port 9092 is already bound [^server.log] [^kraft-server.log] was (Author: ecomar): server logs as broker only (zk mode) and broker/controller (kraft mode) when port 9092 is already bound [^server.log][^kraft-server.log] > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Reporter: Edoardo Comar >Priority: Major > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
Edoardo Comar created KAFKA-16369: - Summary: Broker may not shut down when SocketServer fails to bind as Address already in use Key: KAFKA-16369 URL: https://issues.apache.org/jira/browse/KAFKA-16369 Project: Kafka Issue Type: Bug Reporter: Edoardo Comar When in Zookeeper mode, if a port the broker should listen to is already bound the KafkaException: Socket server failed to bind to localhost:9092: Address already in use. is thrown but the Broker continues to startup . It correctly shuts down when in KRaft mode. Easy to reproduce when in Zookeper mode with server.config set to listen to localhost only {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
junrao commented on code in PR #15476: URL: https://github.com/apache/kafka/pull/15476#discussion_r1523677272 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -293,11 +293,11 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; -offsetOfMaxTimestamp = initialOffset; -} - -if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { -offsetOfMaxTimestamp = offsetCounter.value - 1; +if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { +offsetOfMaxTimestamp = offsetCounter.value - 1; Review Comment: > It seems KIP-734 does not define the case we are discussing - return the initial/last offset when there are multi-records having same "max timestamp". Yes, I agree that we need to be consistent. Using the initial offset makes the most intuitive sense to me. We could update KIP-734 with this clarification and also send it to the original vote thread to make sure there is no objection. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]
chia7712 commented on code in PR #15506: URL: https://github.com/apache/kafka/pull/15506#discussion_r1523669497 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java: ## @@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); -try { -workerTask.execute(); -fail("workerTask.execute should have thrown an exception"); -} catch (ConnectException e) { -assertSame("Exception from put should be the cause", putException, e.getCause()); -assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0); -assertSame(closeException, e.getSuppressed()[0]); -} + +Throwable thrownException = assertThrows(ConnectException.class, () -> workerTask.execute()); +assertSame("Exception from put should be the cause", putException, thrownException.getCause()); Review Comment: @OmniaGM @gharris1727 I'd like to ship it tomorrow if both of you have no objection. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
chia7712 commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1523656781 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { -return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity)); -} - -public BoundedList(int maxLength, List underlying) { +public BoundedList(int maxLength) { if (maxLength <= 0) { throw new IllegalArgumentException("Invalid non-positive maxLength of " + maxLength); } this.maxLength = maxLength; -if (underlying.size() > maxLength) { -throw new BoundedListTooLongException("Cannot wrap list, because it is longer than " + -"the maximum length " + maxLength); -} -this.underlying = underlying; +this.underlying = new ArrayList<>(maxLength); Review Comment: It seems to me the large batch operation is not usual. It could be a potential performance regression if we always create `ArrayList` with `1` initial size. If the origin constructor has potential issue about using a reference to `underlying`, we can change it from `public` to `private`. ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { Review Comment: The operation which we can ensure the size of batch before processing can use this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Minor: downgrade the minISR warning to debug [kafka]
jsancio commented on code in PR #15529: URL: https://github.com/apache/kafka/pull/15529#discussion_r1523660494 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2189,7 +2189,7 @@ int getTopicEffectiveMinIsr(String topicName) { if (minIsrConfig != null) { currentMinIsr = Integer.parseInt(minIsrConfig); } else { -log.warn("Can't find the min isr config for topic: " + topicName + " using default value " + defaultMinIsr); +log.debug("Can't find the min isr config for topic: " + topicName + " using default value " + defaultMinIsr); Review Comment: This comment is for the `log.warn` message in line 2200. What exception are you trying to catch exactly? Is it a `NullPointerException`? If so, shouldn't we throw the exception up the stack? Based on the code I would assume that if there is an error it means that the controller is trying to find the replication factor for a topic and doesn't exists, no? If you decide to keep the log message, lets log the exception instead of logging the exception's `toString`. ```java log.warn("Can't find the replication factor for topic: " + topicName + " using default value " + replicationFactor, e); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
junrao commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1523649729 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,236 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false + val mockTime: Time = new MockTime(1) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages() +createTopicWithConfig(topicName, new Properties()) adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } + override def brokerTime(brokerId: Int): Time = mockTime + @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch("gzip") +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testLatestOffset(quorum: String): Unit = { -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) -assertEquals(3, latestOffset.offset()) + def testThreeRecordsInSeparateBatch(quorum: String): Unit = { +produceMessagesInSeparateBatch() +verifyListOffsets() Review Comment: Why did we exclude the LogAppendTime test in this method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226; Reduce synchronization between producer threads (#15323) [kafka]
msn-tldr commented on PR #15498: URL: https://github.com/apache/kafka/pull/15498#issuecomment-1995048622 The tests failing on jenkins are unrelated, flaky tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]
mumrah commented on code in PR #15478: URL: https://github.com/apache/kafka/pull/15478#discussion_r1523630806 ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -404,21 +414,33 @@ final class KafkaMetadataLog private ( * all cases. * * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted. + * The predicate returns a Some with the reason if the snapshot should be deleted and a None if the snapshot + * should not be deleted */ - private def cleanSnapshots(predicate: OffsetAndEpoch => Boolean): Boolean = { -if (snapshots.size < 2) + private def cleanSnapshots(predicate: OffsetAndEpoch => Option[SnapshotDeletionReason]): Boolean = { +if (snapshots.size < 2) { return false +} var didClean = false snapshots.keys.toSeq.sliding(2).foreach { case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) => -if (predicate(snapshot) && deleteBeforeSnapshot(nextSnapshot)) { - didClean = true -} else { - return didClean +predicate(snapshot) match { + case Some(reason) => +if (deleteBeforeSnapshot(nextSnapshot, reason)) { Review Comment: I assume `&&` will short-circuit in Scala like it does in Java (in which case this change looks correct) ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -677,4 +675,38 @@ object KafkaMetadataLog extends Logging { Snapshots.deleteIfExists(logDir, snapshotId) } } + + private sealed trait SnapshotDeletionReason { +def reason(snapshotId: OffsetAndEpoch): String + } + + private final case class RetentionMsBreach(now: Long, timestamp: Long, retentionMillis: Long) extends SnapshotDeletionReason { +override def reason(snapshotId: OffsetAndEpoch): String = { + s"""Marking snapshot $snapshotId for deletion because it timestamp ($timestamp) is now ($now) older than the + |retention ($retentionMillis""".stripMargin Review Comment: is there a missing ")" here, or should we not have the opening "("? ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -348,14 +357,15 @@ final class KafkaMetadataLog private ( snapshotId.offset <= latestSnapshotId.offset && log.maybeIncrementLogStartOffset(snapshotId.offset, LogStartOffsetIncrementReason.SnapshotGenerated) => // Delete all segments that have a "last offset" less than the log start offset -log.deleteOldSegments() +val deletedSegments = log.deleteOldSegments() // Remove older snapshots from the snapshots cache -(true, forgetSnapshotsBefore(snapshotId)) +val forgottenSnapshots = forgetSnapshotsBefore(snapshotId) +(deletedSegments != 0 || forgottenSnapshots.nonEmpty, forgottenSnapshots) Review Comment: Interesting, did we have a bug here previously? Looks like before we would always report `deleted=true` even if nothing was deleted. I wonder if we even use this return value 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1523608929 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -42,14 +42,26 @@ object AddPartitionsToTxnManager { val VerificationTimeMsMetricName = "VerificationTimeMs" } +/** + * This is an enum which handles the Partition Response based on the Produce Request Version and the exact operation + *defaultOperation: This is the default workflow which maps to cases when the Produce Request Version was lower than expected or when exercising the offset commit request path Review Comment: It is a similar process to pass in the the value, but we will need it for both group coordinator paths. Let me know if you need help. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1523603267 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -42,14 +42,26 @@ object AddPartitionsToTxnManager { val VerificationTimeMsMetricName = "VerificationTimeMs" } +/** + * This is an enum which handles the Partition Response based on the Produce Request Version and the exact operation + *defaultOperation: This is the default workflow which maps to cases when the Produce Request Version was lower than expected or when exercising the offset commit request path Review Comment: I think we should include this change in the offset commit path. It will require bumping that request version as well. > We will bump the ProduceRequest/Response and TxnOffsetCommitRequest/Response version to indicate the client is using the new protocol that doesn’t require adding partitions to transactions and will implicitly do so. The bump will also support new errors ABORTABLE_ERROR We are just doing 2 bumps for the above comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1523597267 ## clients/src/main/java/org/apache/kafka/common/errors/AbortableTransactionException.java: ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class AbortableTransactionException extends ApiException { Review Comment: I'm still thinking on this name as well. Since all transactions are technically abortable, and what we want to convey is that the only way forward is to abort, would it make sense to say TransactionAbortableException? Maybe it doesn't make a huge difference though. 😅 Wondering if there is a way to make that distinction of abortable vs we need to abort to proceed. I realize though that we are also modeling off of RetriableException here and trying to keep that similar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Minor: downgrade the minISR warning to debug [kafka]
CalvinConfluent opened a new pull request, #15529: URL: https://github.com/apache/kafka/pull/15529 Downgrade the log level to avoid spamming the log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16226) Java client: Performance regression in Trogdor benchmark with high partition counts
[ https://issues.apache.org/jira/browse/KAFKA-16226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-16226: -- Fix Version/s: 3.6.2 (was: 3.6.3) > Java client: Performance regression in Trogdor benchmark with high partition > counts > --- > > Key: KAFKA-16226 > URL: https://issues.apache.org/jira/browse/KAFKA-16226 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0, 3.6.1 >Reporter: Mayank Shekhar Narula >Assignee: Mayank Shekhar Narula >Priority: Major > Labels: kip-951 > Fix For: 3.6.2, 3.8.0, 3.7.1 > > Attachments: baseline_lock_profile.png, kafka_15415_lock_profile.png > > > h1. Background > https://issues.apache.org/jira/browse/KAFKA-15415 implemented optimisation in > java-client to skip backoff period if client knows of a newer leader, for > produce-batch being retried. > h1. What changed > The implementation introduced a regression noticed on a trogdor-benchmark > running with high partition counts(36000!). > With regression, following metrics changed on the produce side. > # record-queue-time-avg: increased from 20ms to 30ms. > # request-latency-avg: increased from 50ms to 100ms. > h1. Why it happened > As can be seen from the original > [PR|https://github.com/apache/kafka/pull/14384] > RecordAccmulator.partitionReady() & drainBatchesForOneNode() started using > synchronised method Metadata.currentLeader(). This has led to increased > synchronization between KafkaProducer's application-thread that call send(), > and background-thread that actively send producer-batches to leaders. > Lock profiles clearly show increased synchronisation in KAFKA-15415 > PR(highlighted in {color:#de350b}Red{color}) Vs baseline ( see below ). Note > the synchronisation is much worse for paritionReady() in this benchmark as > its called for each partition, and it has 36k partitions! > h3. Lock Profile: Kafka-15415 > !kafka_15415_lock_profile.png! > h3. Lock Profile: Baseline > !baseline_lock_profile.png! > h1. Fix > Synchronization has to be reduced between 2 threads in order to address this. > [https://github.com/apache/kafka/pull/15323] is a fix for it, as it avoids > using Metadata.currentLeader() instead rely on Cluster.leaderFor(). > With the fix, lock-profile & metrics are similar to baseline. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on PR #15507: URL: https://github.com/apache/kafka/pull/15507#issuecomment-1994913616 @showuon @jolshan Would appreciate if you could take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1523586414 ## clients/src/main/java/org/apache/kafka/common/protocol/Errors.java: ## @@ -392,7 +393,8 @@ public enum Errors { UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new), UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID.", UnknownSubscriptionIdException::new), TELEMETRY_TOO_LARGE(118, "Client sent a push telemetry request larger than the maximum size the broker will accept.", TelemetryTooLargeException::new), -INVALID_REGISTRATION(119, "The controller has considered the broker registration to be invalid.", InvalidRegistrationException::new); +INVALID_REGISTRATION(119, "The controller has considered the broker registration to be invalid.", InvalidRegistrationException::new), +ABORTABLE_TRANSACTION_EXCEPTION(120, "This maps to all errors which can be fixed by aborting the current transaction.", AbortableTransactionException::new); Review Comment: nit: let's leave off the "exception" part to be consistent with the other errors here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Only enable replay methods to modify timeline data structure [kafka]
dongnuo123 commented on code in PR #15528: URL: https://github.com/apache/kafka/pull/15528#discussion_r1523584782 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -593,7 +595,8 @@ public List describeGroups( */ ConsumerGroup getOrMaybeCreateConsumerGroup( String groupId, -boolean createIfNotExists +boolean createIfNotExists, +boolean addToGroupsMap Review Comment: Yeah, you're right. I added the flag to reduce redundancy but it does bring risk of misusing... Let me change this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Only enable replay methods to modify timeline data structure [kafka]
dajac commented on code in PR #15528: URL: https://github.com/apache/kafka/pull/15528#discussion_r1523579678 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -593,7 +595,8 @@ public List describeGroups( */ ConsumerGroup getOrMaybeCreateConsumerGroup( String groupId, -boolean createIfNotExists +boolean createIfNotExists, +boolean addToGroupsMap Review Comment: I am not a fan of this new boolean flag because there is a change of misusing it. Have you considered keeping `getOrMaybeCreateConsumerGroup` on the `consumerGroupHeartbeat` path (without the part which adds to the map) and change the replay logic? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1523572950 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } +/** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param memberThe member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ +private ConsumerGroupMember maybeReconcile( +String groupId, +ConsumerGroupMember member, +BiFunction currentPartitionEpoch, +int targetAssignmentEpoch, +Assignment targetAssignment, +List ownedTopicPartitions, +List records +) { +if (member.isReconciledTo(targetAssignmentEpoch)) { +return member; +} + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(targetAssignmentEpoch, targetAssignment) +.withCurrentPartitionEpoch(currentPartitionEpoch) +.withOwnedTopicPartitions(ownedTopicPartitions) +.build(); + +if (!updatedMember.equals(member)) { +records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + +log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", +groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), +formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + +if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { +scheduleConsumerGroupRebalanceTimeout( +groupId, +updatedMember.memberId(), +updatedMember.memberEpoch(), +updatedMember.rebalanceTimeoutMs() +); +} else { Review Comment: Yeah, this could potentially happen. That's the tradeoff that we have to make here. However, I would like to point out that on the client, the partition is considered assigned even if the client may not be able to fetch from it (e.g. in case of stale metadata) so the contract is loose anyway here. If it turns out to be an issue in the future, I think that we could have a timer for individual partitions. I refrained myself from doing this because it brings a lot of complexity and it does not seem worth it at the moment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]
OmniaGM commented on code in PR #15506: URL: https://github.com/apache/kafka/pull/15506#discussion_r1523571839 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java: ## @@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); -try { -workerTask.execute(); -fail("workerTask.execute should have thrown an exception"); -} catch (ConnectException e) { -assertSame("Exception from put should be the cause", putException, e.getCause()); -assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0); -assertSame(closeException, e.getSuppressed()[0]); -} + +Throwable thrownException = assertThrows(ConnectException.class, () -> workerTask.execute()); +assertSame("Exception from put should be the cause", putException, thrownException.getCause()); Review Comment: I can't see why we care about the ref in some of these tests but if this is the intent then keeping `assertSame` is okay. My comment was more a question whether we need the test to be explicit or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1523564661 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } +/** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param memberThe member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ +private ConsumerGroupMember maybeReconcile( +String groupId, +ConsumerGroupMember member, +BiFunction currentPartitionEpoch, +int targetAssignmentEpoch, +Assignment targetAssignment, +List ownedTopicPartitions, +List records +) { +if (member.isReconciledTo(targetAssignmentEpoch)) { +return member; +} + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(targetAssignmentEpoch, targetAssignment) +.withCurrentPartitionEpoch(currentPartitionEpoch) +.withOwnedTopicPartitions(ownedTopicPartitions) +.build(); + +if (!updatedMember.equals(member)) { +records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + +log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", +groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), +formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + +if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { +scheduleConsumerGroupRebalanceTimeout( +groupId, +updatedMember.memberId(), +updatedMember.memberEpoch(), +updatedMember.rebalanceTimeoutMs() +); +} else { Review Comment: Ah. I was actually just investigating an incident where we had an issue with a deleted partition with the old protocol. 😅 So this makes sense. I guess my only remaining question is whether there is a case where we could get stuck not consuming an existing partition that is assigned to a live member, but that member is not able to get the assignment but is able to heartbeat. This is the case where an assignment timeout could also free this assigned partition to be assigned to another member that may be able to take it. I agree though that it is better in the current state (not blocking the members from getting to the reconcillation etc). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]
gharris1727 commented on code in PR #15506: URL: https://github.com/apache/kafka/pull/15506#discussion_r1523535566 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java: ## @@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); -try { -workerTask.execute(); -fail("workerTask.execute should have thrown an exception"); -} catch (ConnectException e) { -assertSame("Exception from put should be the cause", putException, e.getCause()); -assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0); -assertSame(closeException, e.getSuppressed()[0]); -} + +Throwable thrownException = assertThrows(ConnectException.class, () -> workerTask.execute()); +assertSame("Exception from put should be the cause", putException, thrownException.getCause()); Review Comment: When I wrote this test originally, I did intentionally use assertSame to do a reference equality check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close
[ https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-16217: -- Fix Version/s: 3.8.0 3.6.3 (was: 3.6.2) > Transactional producer stuck in IllegalStateException during close > -- > > Key: KAFKA-16217 > URL: https://issues.apache.org/jira/browse/KAFKA-16217 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.7.0, 3.6.1 >Reporter: Calvin Liu >Assignee: Kirk True >Priority: Major > Labels: transactions > Fix For: 3.8.0, 3.7.1, 3.6.3 > > > The producer is stuck during the close. It keeps retrying to abort the > transaction but it never succeeds. > {code:java} > [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | > producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > org.apache.kafka.clients.producer.internals.Sender run - [Producer > clientId=producer-transaction-ben > ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, > transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > Error in kafka producer I/O thread while aborting transaction: > java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` > because the previous call to `commitTransaction` timed out and must be retried > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) > at java.base/java.lang.Thread.run(Thread.java:1583) > at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) > {code} > With the additional log, I found the root cause. If the producer is in a bad > transaction state(in my case, the TransactionManager.pendingTransition was > set to commitTransaction and did not get cleaned), then the producer calls > close and tries to abort the existing transaction, the producer will get > stuck in the transaction abortion. It is related to the fix > [https://github.com/apache/kafka/pull/13591]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16226) Java client: Performance regression in Trogdor benchmark with high partition counts
[ https://issues.apache.org/jira/browse/KAFKA-16226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-16226: -- Fix Version/s: 3.6.3 (was: 3.6.2) > Java client: Performance regression in Trogdor benchmark with high partition > counts > --- > > Key: KAFKA-16226 > URL: https://issues.apache.org/jira/browse/KAFKA-16226 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0, 3.6.1 >Reporter: Mayank Shekhar Narula >Assignee: Mayank Shekhar Narula >Priority: Major > Labels: kip-951 > Fix For: 3.8.0, 3.7.1, 3.6.3 > > Attachments: baseline_lock_profile.png, kafka_15415_lock_profile.png > > > h1. Background > https://issues.apache.org/jira/browse/KAFKA-15415 implemented optimisation in > java-client to skip backoff period if client knows of a newer leader, for > produce-batch being retried. > h1. What changed > The implementation introduced a regression noticed on a trogdor-benchmark > running with high partition counts(36000!). > With regression, following metrics changed on the produce side. > # record-queue-time-avg: increased from 20ms to 30ms. > # request-latency-avg: increased from 50ms to 100ms. > h1. Why it happened > As can be seen from the original > [PR|https://github.com/apache/kafka/pull/14384] > RecordAccmulator.partitionReady() & drainBatchesForOneNode() started using > synchronised method Metadata.currentLeader(). This has led to increased > synchronization between KafkaProducer's application-thread that call send(), > and background-thread that actively send producer-batches to leaders. > Lock profiles clearly show increased synchronisation in KAFKA-15415 > PR(highlighted in {color:#de350b}Red{color}) Vs baseline ( see below ). Note > the synchronisation is much worse for paritionReady() in this benchmark as > its called for each partition, and it has 36k partitions! > h3. Lock Profile: Kafka-15415 > !kafka_15415_lock_profile.png! > h3. Lock Profile: Baseline > !baseline_lock_profile.png! > h1. Fix > Synchronization has to be reduced between 2 threads in order to address this. > [https://github.com/apache/kafka/pull/15323] is a fix for it, as it avoids > using Metadata.currentLeader() instead rely on Cluster.leaderFor(). > With the fix, lock-profile & metrics are similar to baseline. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523523053 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // ClientAssignors - not supported yet -// TopicPartitions - only sent if it has changed since the last heartbeat. Note that -// the string consists of just the topic ID and the partitions. When an assignment is -// received, we might not yet know the topic name, and then it is learnt subsequently -// by a metadata update. -TreeSet assignedPartitions = membershipManager.currentAssignment().entrySet().stream() -.map(entry -> entry.getKey() + "-" + entry.getValue()) -.collect(Collectors.toCollection(TreeSet::new)); -if (!assignedPartitions.equals(sentFields.topicPartitions)) { +// TopicPartitions - sent with the first heartbeat after a new assignment from the server was +// reconciled. This is ensured by resending the topic partitions whenever the local assignment, +// including its local epoch is changed (although the local epoch is not sent in the heartbeat). +LocalAssignment local = membershipManager.currentAssignment(); +if (local == null) { +data.setTopicPartitions(Collections.emptyList()); +sentFields.topicPartitions = null; +} else if (!local.equals(sentFields.topicPartitions)) { Review Comment: Renamed the field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16120) Fix partition reassignment during ZK migration
[ https://issues.apache.org/jira/browse/KAFKA-16120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16120: - Fix Version/s: 3.6.2 > Fix partition reassignment during ZK migration > -- > > Key: KAFKA-16120 > URL: https://issues.apache.org/jira/browse/KAFKA-16120 > Project: Kafka > Issue Type: Bug >Reporter: David Mao >Assignee: David Mao >Priority: Major > Fix For: 3.7.0, 3.6.2 > > > When a reassignment is completed in ZK migration hybrid mode, the > `StopReplica` sent by the kraft quorum migration propagator is sent with > `delete = false` for deleted replicas when processing the topic delta. This > results in stray replicas. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16120) Fix partition reassignment during ZK migration
[ https://issues.apache.org/jira/browse/KAFKA-16120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16120: - Affects Version/s: 3.6.1 3.6.0 > Fix partition reassignment during ZK migration > -- > > Key: KAFKA-16120 > URL: https://issues.apache.org/jira/browse/KAFKA-16120 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0, 3.6.1 >Reporter: David Mao >Assignee: David Mao >Priority: Major > Fix For: 3.7.0, 3.6.2 > > > When a reassignment is completed in ZK migration hybrid mode, the > `StopReplica` sent by the kraft quorum migration propagator is sent with > `delete = false` for deleted replicas when processing the topic delta. This > results in stray replicas. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523522542 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } }); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining or if has changed since the last heartbeat +if (membershipManager.memberEpoch() == 0 || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { Review Comment: Done ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -508,9 +508,30 @@ private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign */ private void replaceTargetAssignmentWithNewAssignment( ConsumerGroupHeartbeatResponseData.Assignment assignment) { -currentTargetAssignment.clear(); + Review Comment: Implemented both changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523522030 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -210,12 +213,12 @@ public class MembershipManagerImpl implements MembershipManager { private final Map assignedTopicNamesCache; /** - * Topic IDs and partitions received in the last target assignment. Items are added to this set - * every time a target assignment is received. This is where the member collects the assignment - * received from the broker, even though it may not be ready to fully reconcile due to missing - * metadata. + * Topic IDs and partitions received in the last target assignment, together with its local epoch. + * + * This member reassigned every time a new assignment is received. Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15689) KRaftMigrationDriver not logging the skipped event when expected state is wrong
[ https://issues.apache.org/jira/browse/KAFKA-15689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-15689: - Fix Version/s: 3.6.2 > KRaftMigrationDriver not logging the skipped event when expected state is > wrong > --- > > Key: KAFKA-15689 > URL: https://issues.apache.org/jira/browse/KAFKA-15689 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > Fix For: 3.7.0, 3.6.2 > > > The KRaftMigrationDriver.checkDriverState is used in multiple implementations > of the > MigrationEvent base class but when it comes to log that an event was skipped > because the expected state is wrong, it always log "KRafrMigrationDriver" > instead of the skipped event. > For example, a logging line could be like this: > {code:java} > 2023-10-25 12:17:25,460 INFO [KRaftMigrationDriver id=5] Expected driver > state ZK_MIGRATION but found SYNC_KRAFT_TO_ZK. Not running this event > KRaftMigrationDriver. > (org.apache.kafka.metadata.migration.KRaftMigrationDriver) > [controller-5-migration-driver-event-handler] {code} > This is because its code has something like this: > {code:java} > log.info("Expected driver state {} but found {}. Not running this event {}.", > expectedState, migrationState, this.getClass().getSimpleName()); {code} > Of course, the "this" is referring to the KRafrMigrationDriver class. > It should print the specific skipped event instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1523520659 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -530,19 +530,12 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); Review Comment: I updated instance ID to be send it all the time when present. I think the last information was that GC relies on this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16171) Controller failover during ZK migration can prevent metadata updates to ZK brokers
[ https://issues.apache.org/jira/browse/KAFKA-16171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-16171. -- Resolution: Fixed > Controller failover during ZK migration can prevent metadata updates to ZK > brokers > -- > > Key: KAFKA-16171 > URL: https://issues.apache.org/jira/browse/KAFKA-16171 > Project: Kafka > Issue Type: Bug > Components: controller, kraft, migration >Affects Versions: 3.6.0, 3.7.0, 3.6.1 >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > Fix For: 3.6.2, 3.7.0 > > > h2. Description > During the ZK migration, after KRaft becomes the active controller we enter a > state called hybrid mode. This means we have a mixture of ZK and KRaft > brokers. The KRaft controller updates the ZK brokers using the deprecated > controller RPCs (LeaderAndIsr, UpdateMetadata, etc). > > A race condition exists where the KRaft controller will get stuck in a retry > loop while initializing itself after a failover which prevents it from > sending these RPCs to ZK brokers. > h2. Impact > Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK > brokers will not receive any metadata updates. The ZK brokers will be able to > send requests to the controller (such as AlterPartitions), but the metadata > updates which come as a result of those requests will never be seen. This > essentially looks like the controller is unavailable from the ZK brokers > perspective. > h2. Detection and Mitigation > This bug can be seen by observing failed ZK writes from a recently elected > controller. > The tell-tale error message is: > {code:java} > Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This > indicates that another KRaft controller is making writes to ZooKeeper. {code} > with a stacktrace like: > {noformat} > java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. > Expected zkVersion = 507823. This indicates that another KRaft controller is > making writes to ZooKeeper. > at > kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613) > at > kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639) > at > kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664) > at > scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100) > at > scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87) > at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43) > at > kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664) > at > kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158) > at > kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:1583) > at > org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat} > To mitigate this problem, a new KRaft controller should be elected. This can > be done by restarting the problematic active controller. To verify that the > new c
[jira] [Updated] (KAFKA-16139) StreamsUpgradeTest fails consistently in 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-16139: -- Fix Version/s: 3.6.2 (was: 3.6.1) > StreamsUpgradeTest fails consistently in 3.7.0 > -- > > Key: KAFKA-16139 > URL: https://issues.apache.org/jira/browse/KAFKA-16139 > Project: Kafka > Issue Type: Test > Components: streams, system tests >Affects Versions: 3.7.0 >Reporter: Stanislav Kozlovski >Assignee: Bruno Cadonna >Priority: Major > Fix For: 3.7.0, 3.6.2 > > > h1. > kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest#test_rolling_upgrade_with_2_bouncesArguments:\{ > “from_version”: “3.5.1”, “to_version”: “3.7.0-SNAPSHOT”} > > {{TimeoutError('Could not detect Kafka Streams version 3.7.0-SNAPSHOT on > ubuntu@worker2')}} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15817) Avoid reconnecting to the same IP address if multiple addresses are available
[ https://issues.apache.org/jira/browse/KAFKA-15817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-15817: -- Fix Version/s: 3.6.2 > Avoid reconnecting to the same IP address if multiple addresses are available > - > > Key: KAFKA-15817 > URL: https://issues.apache.org/jira/browse/KAFKA-15817 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1 >Reporter: Bob Barrett >Assignee: Bob Barrett >Priority: Major > Fix For: 3.7.0, 3.6.2 > > > In https://issues.apache.org/jira/browse/KAFKA-12193, we changed the DNS > resolution behavior for clients to re-resolve DNS after disconnecting from a > broker, rather than wait until we iterated over all addresses from a given > resolution. This is useful when the IP addresses have changed between the > connection and disconnection. > However, with the behavior change, this does mean that clients could > potentially reconnect immediately to the same IP they just disconnected from, > if the IPs have not changed. In cases where the disconnection happened > because that IP was unhealthy (such as a case where a load balancer has > instances in multiple availability zones and one zone is unhealthy, or a case > where an intermediate component in the network path is going through a > rolling restart), this will delay the client successfully reconnecting. To > address this, clients should remember the IP they just disconnected from and > skip that IP when reconnecting, as long as the address resolved to multiple > addresses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16171) Controller failover during ZK migration can prevent metadata updates to ZK brokers
[ https://issues.apache.org/jira/browse/KAFKA-16171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16171: - Fix Version/s: 3.6.2 > Controller failover during ZK migration can prevent metadata updates to ZK > brokers > -- > > Key: KAFKA-16171 > URL: https://issues.apache.org/jira/browse/KAFKA-16171 > Project: Kafka > Issue Type: Bug > Components: controller, kraft, migration >Affects Versions: 3.6.0, 3.7.0, 3.6.1 >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > Fix For: 3.7.0, 3.6.2 > > > h2. Description > During the ZK migration, after KRaft becomes the active controller we enter a > state called hybrid mode. This means we have a mixture of ZK and KRaft > brokers. The KRaft controller updates the ZK brokers using the deprecated > controller RPCs (LeaderAndIsr, UpdateMetadata, etc). > > A race condition exists where the KRaft controller will get stuck in a retry > loop while initializing itself after a failover which prevents it from > sending these RPCs to ZK brokers. > h2. Impact > Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK > brokers will not receive any metadata updates. The ZK brokers will be able to > send requests to the controller (such as AlterPartitions), but the metadata > updates which come as a result of those requests will never be seen. This > essentially looks like the controller is unavailable from the ZK brokers > perspective. > h2. Detection and Mitigation > This bug can be seen by observing failed ZK writes from a recently elected > controller. > The tell-tale error message is: > {code:java} > Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This > indicates that another KRaft controller is making writes to ZooKeeper. {code} > with a stacktrace like: > {noformat} > java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. > Expected zkVersion = 507823. This indicates that another KRaft controller is > making writes to ZooKeeper. > at > kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613) > at > kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639) > at > kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664) > at > scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100) > at > scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87) > at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43) > at > kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664) > at > kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158) > at > kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:1583) > at > org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat} > To mitigate this problem, a new KRaft controller should be elected. This can > be done by restarting the problematic active controller. To verify that the > new