[GitHub] [kafka] tombentley commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery
tombentley commented on a change in pull request #11006: URL: https://github.com/apache/kafka/pull/11006#discussion_r668471349 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File], val logDirAbsolutePath = dir.getAbsolutePath var hadCleanShutdown: Boolean = false try { -val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) +val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory { Review comment: Yes, thanks! I've now factored out a common method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tang7526 commented on pull request #11034: KAFKA-13075: Consolidate RocksDBStore and RocksDBKeyValueStoreTest
tang7526 commented on pull request #11034: URL: https://github.com/apache/kafka/pull/11034#issuecomment-878820896 @ableegoldman Could you help to review this PR? thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery
tombentley commented on a change in pull request #11006: URL: https://github.com/apache/kafka/pull/11006#discussion_r668466658 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File], val logDirAbsolutePath = dir.getAbsolutePath var hadCleanShutdown: Boolean = false try { -val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) +val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory { + private val factory = Executors.defaultThreadFactory() + private val threadNumber = new AtomicInteger(1) Review comment: Only as a means of having unique names when >1 thread per log dir. You can't really infer anything from the number though, so I could remove it if you want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tang7526 opened a new pull request #11034: KAFKA-13075: Consolidate RocksDBStore and RocksDBKeyValueStoreTest
tang7526 opened a new pull request #11034: URL: https://github.com/apache/kafka/pull/11034 https://issues.apache.org/jira/browse/KAFKA-13075 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13075) Consolidate RocksDBStore and RocksDBKeyValueStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-13075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chun-Hao Tang reassigned KAFKA-13075: - Assignee: Chun-Hao Tang > Consolidate RocksDBStore and RocksDBKeyValueStoreTest > - > > Key: KAFKA-13075 > URL: https://issues.apache.org/jira/browse/KAFKA-13075 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Chun-Hao Tang >Priority: Major > Labels: newbie, newbie++ > > Looks like we have two different test classes covering pretty much the same > thing: RocksDBStore. It seems like RocksDBKeyValueStoreTest was the original > test class for RocksDBStore, but someone later added RocksDBStoreTest, most > likely because they didn't notice the RocksDBKeyValueStoreTest which didn't > follow the usual naming scheme for test classes. > We should consolidate these two into a single file, ideally retaining the > RocksDBStoreTest name since that conforms to the test naming pattern used > throughout Streams (and so this same thing doesn't happen again). It should > also extend AbstractKeyValueStoreTest like the RocksDBKeyValueStoreTest > currently does so we continue to get the benefit of all the tests in there as > well -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug
guozhangwang commented on pull request #10985: URL: https://github.com/apache/kafka/pull/10985#issuecomment-878781675 Just one more reply on the clearing logic, otherwise LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
guozhangwang commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668428677 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set allTopics, // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected members // with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions numMembersAssignedOverMinQuota++; +if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { +potentiallyUnfilledMembersAtMinQuota.clear(); Review comment: Yes that makes sense, still this logic ``` if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { potentiallyUnfilledMembersAtMinQuota.clear(); } ``` Seems only needed because we have the check in 309 (?) Say if we do not check that, but instead just check the expected numbers of consumers with minQuota and maxQuota is satisfied, then do we still need this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13075) Consolidate RocksDBStore and RocksDBKeyValueStoreTest
A. Sophie Blee-Goldman created KAFKA-13075: -- Summary: Consolidate RocksDBStore and RocksDBKeyValueStoreTest Key: KAFKA-13075 URL: https://issues.apache.org/jira/browse/KAFKA-13075 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman Looks like we have two different test classes covering pretty much the same thing: RocksDBStore. It seems like RocksDBKeyValueStoreTest was the original test class for RocksDBStore, but someone later added RocksDBStoreTest, most likely because they didn't notice the RocksDBKeyValueStoreTest which didn't follow the usual naming scheme for test classes. We should consolidate these two into a single file, ideally retaining the RocksDBStoreTest name since that conforms to the test naming pattern used throughout Streams (and so this same thing doesn't happen again). It should also extend AbstractKeyValueStoreTest like the RocksDBKeyValueStoreTest currently does so we continue to get the benefit of all the tests in there as well -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
guozhangwang commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668427821 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set allTopics, allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than -// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers +// the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { -unfilledMembers.add(consumer); +potentiallyUnfilledMembersAtMinQuota.add(consumer); Review comment: Ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.
satishd commented on pull request #11033: URL: https://github.com/apache/kafka/pull/11033#issuecomment-878780060 This change is built on top of https://github.com/apache/kafka/pull/10579. The latest commit https://github.com/apache/kafka/pull/11033/commits/b7837cdc01148b5e1957bb43ed98c8f3877655c3 includes the change relevant to this PR. Earlier commits will be merged as part of https://github.com/apache/kafka/pull/10579. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12925) prefixScan missing from intermediate interfaces
[ https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12925: --- Priority: Critical (was: Major) > prefixScan missing from intermediate interfaces > --- > > Key: KAFKA-12925 > URL: https://issues.apache.org/jira/browse/KAFKA-12925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Michael Viamari >Assignee: Sagar Rao >Priority: Critical > Fix For: 3.0.0, 2.8.1 > > > [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores] > and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] > introduced support for {{prefixScan}} to StateStores. > It seems that many of the intermediate {{StateStore}} interfaces are missing > a definition for {{prefixScan}}, and as such is not accessible in all cases. > For example, when accessing the state stores through a the processor context, > the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not > define {{prefixScan}} and it falls back to the default implementation in > {{KeyValueStore}}, which throws {{UnsupportedOperationException}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations
ableegoldman commented on a change in pull request #10877: URL: https://github.com/apache/kafka/pull/10877#discussion_r668417652 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java ## @@ -66,6 +68,20 @@ public MemoryNavigableLRUCache(final String name, final int maxCacheSize) { .subMap(from, true, to, true).descendingKeySet().iterator(), treeMap)); } +@Override +public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { Review comment: Can you add a test or two for this in `InMemoryLRUCacheStoreTest`? Alternatively, I think you should be able to just collect all the tests in `InMemoryKeyValueStoreTest` and `CachingInMemoryKeyValueStoreTest` and move them over to `AbstractKeyValueStoreTest` instead. That way you get test coverage for both of those plus a handful of other store classes at once, without having to copy the same test over and over across a bunch of different files. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java ## @@ -26,13 +26,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; -import java.util.List; import java.util.NavigableMap; -import java.util.Set; import java.util.TreeMap; +import java.util.List; Review comment: Can you revert the changes in this file? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ## @@ -383,6 +387,1002 @@ public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() { assertNull(store.get("key4")); } +@Test +public void testPrefixScanInMemoryStoreNoCachingNoLogging() { +final String storeName = "prefixScanStore"; +final StoreBuilder> storeBuilder = + Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()) +.withCachingDisabled() +.withLoggingDisabled(); +topology +.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) +.addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1") +.addSink("counts", OUTPUT_TOPIC_1, "processor1"); + +driver = new TopologyTestDriver(topology, props); + +final TestInputTopic inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER); +final TestOutputTopic outputTopic1 = +driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer()); + +inputTopic.pipeInput("key1", "value1"); +inputTopic.pipeInput("key2", "value2"); +inputTopic.pipeInput("key3", "value3"); +inputTopic.pipeInput("key1", "value4"); +assertTrue(outputTopic1.isEmpty()); + +final KeyValueStore store = driver.getKeyValueStore("prefixScanStore"); +final KeyValueIterator prefixScan = store.prefixScan("key", Serdes.String().serializer()); +final List> results = new ArrayList<>(); +while (prefixScan.hasNext()) { +final KeyValue next = prefixScan.next(); +results.add(next); +} + +assertEquals("key1", results.get(0).key); +assertEquals("value4", results.get(0).value); +assertEquals("key2", results.get(1).key); +assertEquals("value2", results.get(1).value); +assertEquals("key3", results.get(2).key); +assertEquals("value3", results.get(2).value); + +} + +@Test +public void testPrefixScanInMemoryStoreWithCachingNoLogging() { +final String storeName = "prefixScanStore"; +final StoreBuilder> storeBuilder = + Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()) +.withCachingEnabled() +.withLoggingDisabled(); +topology +.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) +.addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1") +.addSink("counts", OUTPUT_TOPIC_1, "processor1"); + +driver = new TopologyTestDriver(topology, props); + +final TestInputTopic inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER); +final TestOutputTopic outputTopic1 = +driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer()); + +inputTopic.pipeInput("key1", "value1"); +inputTopic.pipeInput("key2", "value2"); +inputTopic.pipeInput("key3", "value3"); +inputTopic.pipeInput("key1", "value4"); +
[GitHub] [kafka] satishd opened a new pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.
satishd opened a new pull request #11033: URL: https://github.com/apache/kafka/pull/11033 - Added asynchronous API support for RemoeLogMetadataManager add/update/put methods. - Implemented the changes on default topic based RemoteLogMetadataManager. - Refactored the respective tests to cover the introduced asynchronous APIs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations
ableegoldman commented on pull request #10877: URL: https://github.com/apache/kafka/pull/10877#issuecomment-878765736 @vamossagar12 looks like there's a checkstyle failure that's preventing the tests from running, can you fix that as well? Giving this a pass now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug
ableegoldman commented on pull request #10985: URL: https://github.com/apache/kafka/pull/10985#issuecomment-878760857 Responded to your comments @guozhangwang , let me know if that all makes sense or if you have any more concerns that need to be addressed in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668410517 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -238,32 +272,50 @@ private boolean allSubscriptionsEqual(Set allTopics, Iterator unfilledConsumerIter = unfilledMembers.iterator(); // Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota for (TopicPartition unassignedPartition : unassignedPartitions) { -if (!unfilledConsumerIter.hasNext()) { -if (unfilledMembers.isEmpty()) { -// Should not enter here since we have calculated the exact number to assign to each consumer -// There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners. +String consumer; +if (unfilledConsumerIter.hasNext()) { +consumer = unfilledConsumerIter.next(); +} else { +if (unfilledMembers.isEmpty() && potentiallyUnfilledMembersAtMinQuota.isEmpty()) { +// Should not enter here since we have calculated the exact number to assign to each consumer. +// This indicates issues in the assignment algorithm int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition); log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}", -unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); + unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); throw new IllegalStateException("No more unfilled consumers to be assigned."); +} else if (unfilledMembers.isEmpty()) { +consumer = potentiallyUnfilledMembersAtMinQuota.poll(); +} else { +unfilledConsumerIter = unfilledMembers.iterator(); +consumer = unfilledConsumerIter.next(); } -unfilledConsumerIter = unfilledMembers.iterator(); } -String consumer = unfilledConsumerIter.next(); + List consumerAssignment = assignment.get(consumer); consumerAssignment.add(unassignedPartition); // We already assigned all possible ownedPartitions, so we know this must be newly assigned to this consumer -if (allRevokedPartitions.contains(unassignedPartition)) +// or else the partition was actually claimed by multiple previous owners and had to be invalidated from all +// members claimed ownedPartitions +if (allRevokedPartitions.contains(unassignedPartition) || partitionsWithMultiplePreviousOwners.contains(unassignedPartition)) partitionsTransferringOwnership.put(unassignedPartition, consumer); int currentAssignedCount = consumerAssignment.size(); -int expectedAssignedCount = numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota; -if (currentAssignedCount == expectedAssignedCount) { -if (currentAssignedCount == maxQuota) { -numMembersAssignedOverMinQuota++; -} +if (currentAssignedCount == minQuota) { unfilledConsumerIter.remove(); +potentiallyUnfilledMembersAtMinQuota.add(consumer); +} else if (currentAssignedCount == maxQuota) { +numMembersAssignedOverMinQuota++; +if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { +// We only start to iterate over the "potentially unfilled" members at minQuota after we've filled +// all members up to at least minQuota, so once the last minQuota member reaches maxQuota, we +// should be done. But in case of some algorithmic error, just log a warning and continue to +// assign any remaining partitions within the assignment constraints +if (unassignedPartitions.indexOf(unassignedPartition) != unassignedPartitions.size() - 1) { +log.warn("Filled the last member up to maxQuota but still had partitions remaining to assign, " Review comment: I responded to the above comment as well, but specifically here I think that to just check on that condition requires us to make assumptions about the algorithm's correctness up to this point (and the correctness of its assumptions). But if those are all correct then we would never reach this to begin with, so it's better to directly look for any remaining `unassignedPartitions` -- it's a san
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668409463 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set allTopics, // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected members // with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions numMembersAssignedOverMinQuota++; +if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { +potentiallyUnfilledMembersAtMinQuota.clear(); Review comment: While I'm not really a fan of the `potentiallyUnfilledMembersAtMinQuota` logic (it's definitely awkward but I felt it was still the lesser evil in terms of complicating the code), I don't think we can get rid of it that easily. The problem is that when `minQuota != maxQuota`, and so far `currentNumMembersWithOverMinQuotaPartitions` < `expectedNumMembersWithOverMinQuotaPartitions`, then consumers that are filled up to exactly `minQuota` have to be considered potentially not yet at capacity since some will need one more partition, though not all. So this data structure is not just used to verify that everything is properly assigned after we've exhausted the `unassignedPartitions`, it's used to track which consumers can still receive another partition (ie, are "unfilled"). Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668406999 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set allTopics, allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than -// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers +// the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { -unfilledMembers.add(consumer); +potentiallyUnfilledMembersAtMinQuota.add(consumer); Review comment: For the 3 & 4th suggested renamings, it's a bit subtle but this would actually be incorrect. In the case `minQuota == maxQuota`, the `expectedNumMembersAssignedOverMinQuota` variable will evaluate to 0, which would not make sense if it was called `expectedNumMembersWithMaxQuota`. Of course we could go through a tweak the logic for this case, but I'd prefer not to mix that into this PR. For now I'll just clarify in the comments for these variables. (I did still rename them slightly to hopefully be more clear, and also in line with the new names of the other two data structures we renamed) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668406999 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set allTopics, allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than -// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers +// the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { -unfilledMembers.add(consumer); +potentiallyUnfilledMembersAtMinQuota.add(consumer); Review comment: For the 3 & 4th suggested renamings, it's a bit subtle but this would actually be incorrect. In the case `minQuota == maxQuota`, the `expectedNumMembersAssignedOverMinQuota` variable will evaluate to 0, which would not make sense if it was called `expectedNumMembersWithMaxQuota`. Of course we could go through a tweak the logic for this case, but I'd prefer not to mix that into this PR. For now I'll just clarify in the comments for these variables -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13074) Implement mayClean for MockLog
Jose Armando Garcia Sancio created KAFKA-13074: -- Summary: Implement mayClean for MockLog Key: KAFKA-13074 URL: https://issues.apache.org/jira/browse/KAFKA-13074 Project: Kafka Issue Type: Bug Reporter: Jose Armando Garcia Sancio The current implement of MockLog doesn't implement maybeClean. It is expected that MockLog has the same semantic as KafkaMetadataLog. This is assumed to be true for a few of the tests suite like the raft simulation and the kafka raft client test context. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13074) Implement mayClean for MockLog
[ https://issues.apache.org/jira/browse/KAFKA-13074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-13074: --- Labels: kip-500 (was: ) > Implement mayClean for MockLog > -- > > Key: KAFKA-13074 > URL: https://issues.apache.org/jira/browse/KAFKA-13074 > Project: Kafka > Issue Type: Bug >Reporter: Jose Armando Garcia Sancio >Priority: Major > Labels: kip-500 > > The current implement of MockLog doesn't implement maybeClean. It is expected > that MockLog has the same semantic as KafkaMetadataLog. This is assumed to be > true for a few of the tests suite like the raft simulation and the kafka raft > client test context. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668406080 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set allTopics, allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than -// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers +// the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { -unfilledMembers.add(consumer); +potentiallyUnfilledMembersAtMinQuota.add(consumer); Review comment: Ack on the first two renamings, though I'd still want to prefix them with `unfilled` to emphasize that these structures only hold members that may potentially be assigned one or more partitions. ie, if `minQuota == maxQuota`, then `potentiallyUnfilledMembersAtMinQuota` should actually be empty, in which case `MembersWithExactMinQuotaPartitions` doesn't quite make sense. I'll clarify this in the comments as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13073) Simulation test fails due to inconsistency in MockLog's implementation
[ https://issues.apache.org/jira/browse/KAFKA-13073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-13073: --- Labels: kip-500 (was: ) > Simulation test fails due to inconsistency in MockLog's implementation > -- > > Key: KAFKA-13073 > URL: https://issues.apache.org/jira/browse/KAFKA-13073 > Project: Kafka > Issue Type: Bug > Components: controller, replication >Affects Versions: 3.0.0 >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: kip-500 > Fix For: 3.0.0 > > > We are getting the following error on trunk > {code:java} > RaftEventSimulationTest > canRecoverAfterAllNodesKilled STANDARD_OUT > timestamp = 2021-07-12T16:26:55.663, > RaftEventSimulationTest:canRecoverAfterAllNodesKilled = > java.lang.RuntimeException: > Uncaught exception during poll of node 1 > |---jqwik--- > tries = 25| # of calls to property > checks = 25 | # of not rejected calls > generation = RANDOMIZED | parameters are randomly generated > after-failure = PREVIOUS_SEED | use the previous seed > when-fixed-seed = ALLOW | fixing the random seed is allowed > edge-cases#mode = MIXIN | edge cases are mixed in > edge-cases#total = 108| # of all combined edge cases > edge-cases#tried = 4 | # of edge cases tried in current run > seed = 8079861963960994566| random seed to reproduce generated values >Sample > -- > arg0: 4002 > arg1: 2 > arg2: 4{code} > I think there are a couple of issues here: > # The {{ListenerContext}} for {{KafkaRaftClient}} uses the value returned by > {{ReplicatedLog::startOffset()}} to determined the log start and when to load > a snapshot while the {{MockLog}} implementation uses {{logStartOffset}} which > could be a different value. > # {{MockLog}} doesn't implement {{ReplicatedLog::maybeClean}} so the log > start offset is always 0. > # The snapshot id validation for {{MockLog}} and {{KafkaMetadataLog}}'s > {{createNewSnapshot}} throws an exception when the snapshot id is less than > the log start offset. > Solutions: > Fix the error quoted above we only need to fix bullet point 3. but I think we > should fix all of the issues enumerated in this Jira. > For 1. we should change the {{MockLog}} implementation so that it uses > {{startOffset}} both externally and internally. > For 2. I will file another issue to track this implementation. > For 3. I think this validation is too strict. I think it is safe to simply > ignore any attempt by the state machine to create an snapshot with an id less > that the log start offset. We should return a {{Optional.empty()}}when the > snapshot id is less than the log start offset. This tells the user that it > doesn't need to generate a snapshot for that offset. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio opened a new pull request #11032: KAFKA-13073: Inconsistent MockLog implementation
jsancio opened a new pull request #11032: URL: https://github.com/apache/kafka/pull/11032 Fix a simulation test failure by: 1. Relaxing the valiation of the snapshot id against the log start offset when the state machine attempts to create new snapshot. It is safe to just ignore the request instead of throwing an exception when the snapshot id is less that the log start offset. 2. Fixing the MockLog implementation so that it uses startOffset both externally and internally. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668402587 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -130,19 +146,26 @@ private boolean allSubscriptionsEqual(Set allTopics, for (final TopicPartition tp : memberData.partitions) { // filter out any topics that no longer exist or aren't part of the current subscription if (allTopics.contains(tp.topic())) { -ownedPartitions.add(tp); + +if (!allPreviousPartitionsToOwner.containsKey(tp)) { +allPreviousPartitionsToOwner.put(tp, consumer); +ownedPartitions.add(tp); +} else { +String otherConsumer = allPreviousPartitionsToOwner.get(tp); +log.warn("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " Review comment: Good point, yes I would absolutely want/hope a user would report this as a bug. Changed to ERROR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668402323 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -121,7 +129,12 @@ private boolean allSubscriptionsEqual(Set allTopics, // If the current member's generation is higher, all the previously owned partitions are invalid if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { - membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); +allPreviousPartitionsToOwner.clear(); Review comment: In that case, it's never added to `consumerToOwnedPartitions` in the first place. This map is not pre-filled, it gets populated inside this loop. So if its `< maxGeneration`, then we just insert an empty list into the map for that member's owned partitions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13073) Simulation test fails due to inconsistency in MockLog's implementation
Jose Armando Garcia Sancio created KAFKA-13073: -- Summary: Simulation test fails due to inconsistency in MockLog's implementation Key: KAFKA-13073 URL: https://issues.apache.org/jira/browse/KAFKA-13073 Project: Kafka Issue Type: Bug Components: controller, replication Affects Versions: 3.0.0 Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio Fix For: 3.0.0 We are getting the following error on trunk {code:java} RaftEventSimulationTest > canRecoverAfterAllNodesKilled STANDARD_OUT timestamp = 2021-07-12T16:26:55.663, RaftEventSimulationTest:canRecoverAfterAllNodesKilled = java.lang.RuntimeException: Uncaught exception during poll of node 1 |---jqwik--- tries = 25| # of calls to property checks = 25 | # of not rejected calls generation = RANDOMIZED | parameters are randomly generated after-failure = PREVIOUS_SEED | use the previous seed when-fixed-seed = ALLOW | fixing the random seed is allowed edge-cases#mode = MIXIN | edge cases are mixed in edge-cases#total = 108| # of all combined edge cases edge-cases#tried = 4 | # of edge cases tried in current run seed = 8079861963960994566| random seed to reproduce generated values Sample -- arg0: 4002 arg1: 2 arg2: 4{code} I think there are a couple of issues here: # The {{ListenerContext}} for {{KafkaRaftClient}} uses the value returned by {{ReplicatedLog::startOffset()}} to determined the log start and when to load a snapshot while the {{MockLog}} implementation uses {{logStartOffset}} which could be a different value. # {{MockLog}} doesn't implement {{ReplicatedLog::maybeClean}} so the log start offset is always 0. # The snapshot id validation for {{MockLog}} and {{KafkaMetadataLog}}'s {{createNewSnapshot}} throws an exception when the snapshot id is less than the log start offset. Solutions: Fix the error quoted above we only need to fix bullet point 3. but I think we should fix all of the issues enumerated in this Jira. For 1. we should change the {{MockLog}} implementation so that it uses {{startOffset}} both externally and internally. For 2. I will file another issue to track this implementation. For 3. I think this validation is too strict. I think it is safe to simply ignore any attempt by the state machine to create an snapshot with an id less that the log start offset. We should return a {{Optional.empty()}}when the snapshot id is less than the log start offset. This tells the user that it doesn't need to generate a snapshot for that offset. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10986: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group
ableegoldman commented on pull request #10986: URL: https://github.com/apache/kafka/pull/10986#issuecomment-878749974 Now ready for review @dajac @hachikuji @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery
chia7712 commented on a change in pull request #11006: URL: https://github.com/apache/kafka/pull/11006#discussion_r668398858 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File], val logDirAbsolutePath = dir.getAbsolutePath var hadCleanShutdown: Boolean = false try { -val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) +val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory { + private val factory = Executors.defaultThreadFactory() + private val threadNumber = new AtomicInteger(1) Review comment: Why we need this counter? It seems to me `log-recovery` is good enough. ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File], val logDirAbsolutePath = dir.getAbsolutePath var hadCleanShutdown: Boolean = false try { -val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) +val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory { Review comment: line#495 (`shutdown`) creates thread poll also. Does it need a better naming? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10986: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group
ableegoldman commented on pull request #10986: URL: https://github.com/apache/kafka/pull/10986#issuecomment-878748345 Ok I realize we actually do have a test that reproduces this already: `ConsumerCoordinatorTest.testRebalanceWithMetadataChange`. This test sets up a case where a change in topic metadata triggers a rebalance after a member had joined the group, after which the change is reverted so that the metadata is ultimately the same. Then a `NOT_COORDINATOR` response is sent to fail the initial JoinGroup, and the test just verifies that the member attempts to rejoin until successful. It also verifies things like the number of times each rebalance callback is invoked, and the set of partitions that the callbacks receive. This test actually only failed in the COOPERATIVE case, which confirms that the behavior remains correct for the EAGER case. When following the COOPERATIVE protocol, the test was formerly assuming that the member would retain all partitions despite actually having its generation and memberId cleared when the initial JoinGroup is failed. So it was technically asserting the wrong behavior beforehand; just fixing this gives us a unit test for this patch after all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379536#comment-17379536 ] Luke Chen commented on KAFKA-12495: --- [~kkonstantine], could you take a look the PR? I suddenly found this bug is blocking a V3.0 blocker flaky test. Please help! Thank you. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > but we didn't revoke any more C/T in this round, which cause unbalanced > distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > Because we didn't allow to do consecutive revoke in two consecutive > rebalances (under the same leader), we will have this uneven distribution > under this situation. We should allow consecutive rebalance to have another > round of revocation to revoke the C/T to the other members in this case. > expected: > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > **and also revoke some C/T** > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W4(delay: 0, assigned: [BT4, BT5], revoked: []) > // another round of rebalance to assign the new revoked C/T to the other > members > W1 rejoins with assignment: [AC0, AT1, AT2] > Rebalance is triggered > W2 joins with assignment: [AT4, AT5
[GitHub] [kafka] showuon commented on pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector
showuon commented on pull request #10367: URL: https://github.com/apache/kafka/pull/10367#issuecomment-878731886 @kkonstantine , I suddenly found this is a V3.0 blocker bug. Could you help take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12629) Failing Test: RaftClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379529#comment-17379529 ] Luke Chen commented on KAFKA-12629: --- After KAFKA-12677 merged into trunk in build # 310. There's no RaftClusterTest releated tests failed in build #310, #311. Mark this ticket as resolved. Thanks. > Failing Test: RaftClusterTest > - > > Key: KAFKA-12629 > URL: https://issues.apache.org/jira/browse/KAFKA-12629 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Blocker > Labels: flaky-test > Fix For: 3.0.0 > > > {quote} {{java.util.concurrent.ExecutionException: > java.lang.ClassNotFoundException: > org.apache.kafka.controller.NoOpSnapshotWriterBuilder > at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13072) refactor RemoveMembersFromConsumerGroupHandler
Luke Chen created KAFKA-13072: - Summary: refactor RemoveMembersFromConsumerGroupHandler Key: KAFKA-13072 URL: https://issues.apache.org/jira/browse/KAFKA-13072 Project: Kafka Issue Type: Sub-task Reporter: Luke Chen Assignee: Luke Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9648) kafka server should resize backlog when create serversocket
[ https://issues.apache.org/jira/browse/KAFKA-9648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379515#comment-17379515 ] Haruki Okada edited comment on KAFKA-9648 at 7/13/21, 2:05 AM: --- Hi. We operate a Kafka cluster in our company which has 130+ brokers, 1.3M+ total socket server connections and 25K+ partitions. We faced very similar issue to https://issues.apache.org/jira/browse/KAFKA-9211 (i.e. producer slowing down mysteriously with small TCP packets) recently, and we figured out the cause. We hope our knowledge could help. h2. Environment: * Kafka broker version: 2.4.1 (but we suppose the version doesn't matter) * Kafka broker OS: CentOS7 (kernel 3.10.X) h2. Phenomenon: * Restart a broker process, and execute preferred leader election after the broker became in-sync * Some producers's node-request-latency to the broker got insanely higher than usual ** However no such high produce response latency is observed on broker-side metrics * As the result, producer batches couldn't be sent out in sufficient pace, then caused batch expiration h2. Analysis: * We observed TCP SYN cookies metric was increased at incidental time, with following dmesg message: ** {code:java} TCP: request_sock_TCP: Possible SYN flooding on port 22991. Sending cookies. Check SNMP counters.{code} * So we also suspected the phenomenon is due to `wscale` drop (as like described in this issue), but we doubt it because: ** Even with TCP SYN cookies, `wscale` should be available as long as TCP timestamp is enabled. (refs: [https://blog.cloudflare.com/syn-packet-handling-in-the-wild/]) ** In our environment, TCP timestamp is enabled. ** Also, generally, `wscale` (window scaling factor) is used for extending window beyond 65535 (max window size in TCP spec) on large-network round trip environment such as internet *** Our typical produce request size is smaller than that *** So it's hard to imagine that `wscale` drop causes such significant request-latency degradation * After several attempts to reproduce, we found out that receiver (i.e. broker in this context)'s `wscale` is inconsistent between producer and broker at incidental time. ** receiver's `wscale` advertised from broker -> producer along with SYN+ACK packet: 7 *** {code:java} 17:32:05.161213 IP BROKER.HOST. > CLIENT.: Flags [S.], seq 29292755, ack 17469019, win 28960, options [mss 1460,sackOK,TS val 25589601 ecr 9055245,nop,wscale 1], length 0{code} * ** *** (seq numbers are substituted with random value) ** actual receiver's `wscale` after established: 1 *** {code:java} [user@BROKER ~]$ /sbin/ss -e -i -t | less ... ESTAB 0 0 BROKER. CLIENT. timer:(keepalive,21min,0) uid:503 ino:15143860 sk:9ba25dc4f440 <-> ts sack cubic wscale:7,7 rto:201 rtt:0.179/0.006 ato:140 mss:1448 rcvmss:1448 advmss:1448 cwnd:10 bytes_acked:97054 bytes_received:18950537 segs_out:15094 segs_in:13941 send 647.2Mbps lastsnd:217 lastrcv:17 lastack:217 pacing_rate 1288.0Mbps rcv_rtt:1.875 rcv_space:29200{code} * ** *** `wscale:7,7` means that broker's receiver window scaling factor is 7 * Okay, then this inconsistency could explain the phenomenon as below: ** Premise: When `wscale` is enabled, TCP window size is calculated by `window_size * 2^wscale` ** When broker calculates advertised window size, it's bit-shifted to the right by `wscale` (== 7) *** [https://github.com/torvalds/linux/blob/v3.10/net/ipv4/tcp_output.c#L290] ** On the other hand, producer multiplies advertised window size by 1, which is conveyed through SYN+ACK ** As the result, window size became 64 times smaller than expected ** Then producer splits TCP packets to much smaller size than usual (possibly under MSS) *** TCP acks are delayed due to the conditions are not met ([https://github.com/torvalds/linux/blob/v3.10/net/ipv4/tcp_input.c#L4760]) * Last remaining question is "why such wscale inconsistency happened?" ** Read through the kernel source code, then we found that there's an issue in the logic to calculate wscale if TCP connection is established through syncookies *** It's fixed in this commit: [https://github.com/torvalds/linux/commit/909172a149749242990a6e64cb55d55460d4e417] h2. Solution: * 1. Upgrade linux kernel to at least v5.10 (which the patch is committed) * 2. Disable SYN cookies ** i.e. setting `net.ipv4.tcp_syncookies` kernel parameter to 0 ** With SYN cookies disabled, some SYN's are dropped on same situations but typically it's not be a serious problem thanks to SYN retries. Clients should reconnect soon. * 3. Adjust backlog size (as this ticket suggests) ** Even disabling SYN cookies work thanks to SYN retries, it will cause certain delay in TCP establishment if reconnect happened. ** So generally it's preferred to adjust backlog size as w
[jira] [Commented] (KAFKA-9648) kafka server should resize backlog when create serversocket
[ https://issues.apache.org/jira/browse/KAFKA-9648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379515#comment-17379515 ] Haruki Okada commented on KAFKA-9648: - Hi. We operate a Kafka cluster in our company which has 130+ brokers, 1.3M+ total socket server connections and 25K+ partitions. We faced very similar issue to https://issues.apache.org/jira/browse/KAFKA-9211 (i.e. producer slowing down mysteriously with small TCP packets) recently, and we figured out the cause. We hope our knowledge could help. h2. Environment: * Kafka broker version: 2.4.1 (but we suppose the version doesn't matter) * Kafka broker OS: CentOS7 (kernel 3.10.X) h2. Phenomenon: * Restart a broker process, and execute preferred leader election after the broker became in-sync * Some producers's node-request-latency to the broker got insanely higher than usual ** However no such high produce response latency is observed on broker-side metrics * As the result, producer batches couldn't be sent out in sufficient pace, then caused batch expiration h2. Analysis: * We observed TCP SYN cookies metric was increased at incidental time, with following dmesg message: ** {code:java} TCP: request_sock_TCP: Possible SYN flooding on port 22991. Sending cookies. Check SNMP counters.{code} * So we also suspected the phenomenon is due to `wscale` drop (as like described in this issue), but we doubt it because: ** Even with TCP SYN cookies, `wscale` should be available as long as TCP timestamp is enabled. (refs: [https://blog.cloudflare.com/syn-packet-handling-in-the-wild/]) ** In our environment, TCP timestamp is enabled. ** Also, generally, `wscale` (window scaling factor) is used for extending window beyond 65535 (max window size in TCP spec) on large-network round trip environment such as internet *** Our typical produce request size is smaller than that *** So it's hard to imagine that `wscale` drop causes such significant request-latency degradation * After several attempts to reproduce, we found out that receiver (i.e. broker in this context)'s `wscale` is inconsistent between producer and broker at incidental time. ** receiver's `wscale` advertised from broker -> producer along with SYN+ACK packet: 7 *** {code:java} 17:32:05.161213 IP BROKER.HOST. > CLIENT.: Flags [S.], seq 29292755, ack 17469019, win 28960, options [mss 1460,sackOK,TS val 25589601 ecr 9055245,nop,wscale 1], length 0{code} *** (seq numbers are substituted with random value) ** actual receiver's `wscale` after established: 1 *** {code:java} [user@BROKER ~]$ /sbin/ss -e -i -t | less ... ESTAB 0 0 BROKER. CLIENT. timer:(keepalive,21min,0) uid:503 ino:15143860 sk:9ba25dc4f440 <-> ts sack cubic wscale:7,7 rto:201 rtt:0.179/0.006 ato:140 mss:1448 rcvmss:1448 advmss:1448 cwnd:10 bytes_acked:97054 bytes_received:18950537 segs_out:15094 segs_in:13941 send 647.2Mbps lastsnd:217 lastrcv:17 lastack:217 pacing_rate 1288.0Mbps rcv_rtt:1.875 rcv_space:29200{code} *** `wscale:7,7` means that broker's receiver window scaling factor is 7 * Okay, then this inconsistency could explain the phenomenon as below: ** Premise: When `wscale` is enabled, TCP window size is calculated by `window_size * 2^wscale` ** When broker calculates advertised window size, it's bit-shifted to the right by `wscale` (== 7) *** [https://github.com/torvalds/linux/blob/v3.10/net/ipv4/tcp_output.c#L290] ** On the other hand, producer multiplies advertised window size by 1, which is conveyed through SYN+ACK ** As the result, window size became 64 times smaller than expected ** Then producer splits TCP packets to much smaller size than usual (possibly under MSS) *** TCP acks are delayed due to the conditions are not met ([https://github.com/torvalds/linux/blob/v3.10/net/ipv4/tcp_input.c#L4760]) * Last remaining question is "why such wscale inconsistency happened?" ** Read through the kernel source code, then we found that there's an issue in the logic to calculate wscale if TCP connection is established through syncookies *** It's fixed in this commit: [https://github.com/torvalds/linux/commit/909172a149749242990a6e64cb55d55460d4e417] h2. Solution: * 1. Upgrade linux kernel to at least v5.10 (which the patch is committed) * 2. Disable SYN cookies ** i.e. setting `net.ipv4.tcp_syncookies` kernel parameter to 0 ** With SYN cookies disabled, some SYN's are dropped on same situations but typically it's not be a serious problem thanks to SYN retries. Clients should reconnect soon. * 3. Adjust backlog size (as this ticket suggests) ** Even disabling SYN cookies work thanks to SYN retries, it will cause certain delay in TCP establishment if reconnect happened. ** So generally it's preferable to adjust backlog size as well, to avoid SYN drops in the first place. I think we should
[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379505#comment-17379505 ] A. Sophie Blee-Goldman commented on KAFKA-13008: Thanks Guozhang, +1 on that approach (though I'll let Colin confirm whether #1 does make sense or not). We'll definitely need a Streams/client side fix if the 'real' fix is going to be on the broker side. My only question is whether this is something that might trip up other plain consumer client users in addition to Streams, and if so, whether there's anything we could do in the consumer client itself. But AFAIK it's only Streams that really relies on this metadata in this critical way, so I'm happy with the Streams-side fix as well > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or partition lag > keeps > 0 (need KIP) > [~vvcephei] [~guozhang] , any suggestions? > > cc [~ableegoldman] [~mjsax] , this is the root cause that in > [https://github.com/apache/kafka/pull/10736,] we discussed and thought > there's a data lose situation. FYI. -- This message was sent by Atlassian Jira (v8.3.4#803
[GitHub] [kafka] ableegoldman commented on pull request #10986: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group
ableegoldman commented on pull request #10986: URL: https://github.com/apache/kafka/pull/10986#issuecomment-878706682 @hachikuji in the EAGER case, after the first `onJoinPrepare` / `onPartitionsRevoked`, the subscription would have been cleared. So any subsequent invocations of `onPartitionsRevoked` would be with an empty set of partitions @everyone, I was having trouble getting a unit test that would actually verify this behavior but I wanted to kick off discussion on the fix ASAP (for obvious reasons) so I opened the PR without one. I do intended to add a test, I just haven't had time to pursue that yet. Suggestions welcome :P -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config
showuon commented on pull request #10811: URL: https://github.com/apache/kafka/pull/10811#issuecomment-878703335 @rondagostino , thanks for the comments. Actually, I've already added 2 test cases for that: `shouldNotAllowDescribeBrokerWhileBrokerUpUsingZookeeper` -> to test `describe` is not allowed while brokers are up `shouldSupportDescribeBrokerBeforeBrokerUpUsingZookeeper` -> to test `describe` should be supported before brokers are up Please let me know if there's any other suggestion. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh
Jason Gustafson created KAFKA-13071: --- Summary: Deprecate and remove --authorizer option in kafka-acls.sh Key: KAFKA-13071 URL: https://issues.apache.org/jira/browse/KAFKA-13071 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Now that we have all of the ACL APIs implemented through the admin client, we should consider deprecating and removing support for the --authorizer flag in kafka-acls.sh. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax edited a comment on pull request #10953: URL: https://github.com/apache/kafka/pull/10953#issuecomment-878690334 > Otherwise, Streams app that run with 2.8 and before might not be compatible with Streams 3.0 because the retention time of the changelog topics created with older Streams apps will be smaller than the assumed retention time for Streams apps in 3.0. Why would this be fatal? It seems in 3.0 the retention time would potentially become _larger_ (if a new changlog topic is created). And for any existing application, we won't reconfigure the topic config. While I agree that we should keep the behavior in-sync, I don't see how there could be data loss? -- Note that if the old behavior has a shorter retention time, it would be a "bug" if the state store keeps stuff longer now (even if retention-time is a _lower_ bound an keeping stuff longer is still correct; retention it's not a strict bound), but it won't be a regression with regard to data loss, as we never guaranteed to keep the data longer than in the old behavior case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax edited a comment on pull request #10953: URL: https://github.com/apache/kafka/pull/10953#issuecomment-878687476 Correct, sliding-windows were added later and never had a default grace. Also, we deprecated `until() / maintainMs()` before we added sliding-windows and because they do not inherit from `Windows` is never added `until() / maintainMs()` -- thus, I don't think we need to update them. Note that there is an interplay between grace period and retention time in the old API. However, looking into the code I am a little bit confused about `maintainMs()` in `TimeWindows`: ``` public long maintainMs() { return Math.max(maintainDurationMs, sizeMs + gracePeriodMs()); } ``` In contrast, `JoinWindows` used `Math.max(maintainDurationMs, size());` and `SessionWindows` used `Math.max(maintainDurationMs, gapMs);`. Btw: yes, I believe we need to update JoinWindows, too. However, IIRC, we never allowed to set retention time: because for stream-stream joins, stores are not exposed via IQ, it does not make sense to set a larger retention time than size+grace (ie, we ignore `maintainMs()` / `retention()` configuration). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax edited a comment on pull request #10953: URL: https://github.com/apache/kafka/pull/10953#issuecomment-878687476 Correct, sliding-windows were added later and never had a default grace. Also, we deprecated `until() / maintainMs()` before we added sliding-windows and because they do not inherit from `Windows` is never added `until() / maintainMs()` -- thus, I don't think we need to update them. Note that there is an interplay between grace period and retention time in the old API. However, looking into the code I am a little bit confused about `maintainMs()` in `TimeWindows`: ``` public long maintainMs() { return Math.max(maintainDurationMs, sizeMs + gracePeriodMs()); } ``` In contrast, `JoinWindows` used `Math.max(maintainDurationMs, size());` and `SessionWindows` used `Math.max(maintainDurationMs, gapMs);`. Btw: yes, I believe we need to update JoinWindows, too. However, IIRC, we never allowed to set retention time using the new API: because for stream-stream joins, stores are not exposed via IQ, it does not make sense to set a larger retention time than size+grace (ie, we ignore `maintainMs()` / `retention()` configuration anyway). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax edited a comment on pull request #10953: URL: https://github.com/apache/kafka/pull/10953#issuecomment-878687476 Correct, sliding-windows were added later and never had a default grace. Also, we deprecated `until() / maintainMs()` before we added sliding-windows and because they do not inherit from `Windows` is never added `until() / maintainMs()` -- thus, I don't think we need to update them. Note that there is an interplay between grace period and retention time in the old API. However, looking into the code I am a little bit confused about `maintainMs()` in `TimeWindows`: ``` public long maintainMs() { return Math.max(maintainDurationMs, sizeMs + gracePeriodMs()); } ``` In contrast, `JoinWindows` used `Math.max(maintainDurationMs, size());` and `SessionWindows` used `Math.max(maintainDurationMs, gapMs);`. Btw: yes, I believe we (technically) need to update JoinWindows, too. However, IIRC, we never allowed to set retention time using the new API: because for stream-stream joins, stores are not exposed via IQ, it does not make sense to set a larger retention time than size+grace. (Ie, we ignore `maintainMs()` / `retention()` configuration anyway). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax edited a comment on pull request #10953: URL: https://github.com/apache/kafka/pull/10953#issuecomment-878690334 > Otherwise, Streams app that run with 2.8 and before might not be compatible with Streams 3.0 because the retention time of the changelog topics created with older Streams apps will be smaller than the assumed retention time for Streams apps in 3.0. Why would this be fatal? It seems in 3.0 the retention time would potentially become _larger_ (if a new changlog topic is created). And for any existing application, we won't reconfigure the topic config. While I agree that we should keep the behavior in-sync, I don't see how there could be data loss? -- Note that if the old behavior has a shorter retention time, it would be a "bug" if the state store keeps stuff longer now, but it won't be a regression with regard to data loss, as we never guaranteed to keep the data longer than in the old behavior case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax edited a comment on pull request #10953: URL: https://github.com/apache/kafka/pull/10953#issuecomment-878690334 > Otherwise, Streams app that run with 2.8 and before might not be compatible with Streams 3.0 because the retention time of the changelog topics created with older Streams apps will be smaller than the assumed retention time for Streams apps in 3.0. Why would this be fatal? It seems in 3.0 the retention time would potentially become _larger_ (if a new changlog topic is created). And for any existing application, we won't reconfigure the topic config. While I agree that we should keep the behavior in-sync, I don't see how there could be data loss? -- Note that if the old behavior has a shorter retention time, it would be a "bug" if the state store keeps stuff longer now, but it won't be a regression with regard to data loss, as we never guaranteed to keep the data longer than in the old behavior case. (Ie, we ignore `maintainMs()` / `retention()` configuration anyway). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax edited a comment on pull request #10953: URL: https://github.com/apache/kafka/pull/10953#issuecomment-878687476 Correct, sliding-windows were added later and never had a default grace. Also, we deprecated `until() / maintainMs()` before we added sliding-windows and because they do not inherit from `Windows` is never added `until() / maintainMs()` -- thus, I don't think we need to update them. Note that there is an interplay between grace period and retention time in the old API. However, looking into the code I am a little bit confused about `maintainMs()` in `TimeWindows`: ``` public long maintainMs() { return Math.max(maintainDurationMs, sizeMs + gracePeriodMs()); } ``` In contrast, `JoinWindows` used `Math.max(maintainDurationMs, size());` and `SessionWindows` used `Math.max(maintainDurationMs, gapMs);`. Btw: yes, I believe me need to update JoinWindows, too. However, IIRC, we never allowed to set retention time using the new API: because for stream-stream joins, stores are not exposed via IQ, it does not make sense to set a larger retention time than size+grace. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12993) Formatting of Streams 'Memory Management' docs is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12993: --- Fix Version/s: (was: 2.8.1) > Formatting of Streams 'Memory Management' docs is messed up > > > Key: KAFKA-12993 > URL: https://issues.apache.org/jira/browse/KAFKA-12993 > Project: Kafka > Issue Type: Bug > Components: docs, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > > The formatting of this page is all messed up, starting in the RocksDB > section. It looks like there's a missing closing tag after the example > BoundedMemoryRocksDBConfig class -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12993) Formatting of Streams 'Memory Management' docs is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379486#comment-17379486 ] ASF GitHub Bot commented on KAFKA-12993: ableegoldman merged pull request #361: URL: https://github.com/apache/kafka-site/pull/361 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Formatting of Streams 'Memory Management' docs is messed up > > > Key: KAFKA-12993 > URL: https://issues.apache.org/jira/browse/KAFKA-12993 > Project: Kafka > Issue Type: Bug > Components: docs, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > The formatting of this page is all messed up, starting in the RocksDB > section. It looks like there's a missing closing tag after the example > BoundedMemoryRocksDBConfig class -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-7493) Rewrite test_broker_type_bounce_at_start
[ https://issues.apache.org/jira/browse/KAFKA-7493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-7493: - Assignee: (was: A. Sophie Blee-Goldman) > Rewrite test_broker_type_bounce_at_start > > > Key: KAFKA-7493 > URL: https://issues.apache.org/jira/browse/KAFKA-7493 > Project: Kafka > Issue Type: Improvement > Components: streams, system tests >Reporter: John Roesler >Priority: Major > Fix For: 3.0.0 > > > Currently, the test test_broker_type_bounce_at_start in > streams_broker_bounce_test.py is ignored. > As written, there are a couple of race conditions that lead to flakiness. > It should be possible to re-write the test to wait on log messages, as the > other tests do, instead of just sleeping to more deterministically transition > the test from one state to the next. > Once the test is fixed, the fix should be back-ported to all prior branches, > back to 0.10. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7493) Rewrite test_broker_type_bounce_at_start
[ https://issues.apache.org/jira/browse/KAFKA-7493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-7493: -- Fix Version/s: (was: 3.0.0) 3.1.0 > Rewrite test_broker_type_bounce_at_start > > > Key: KAFKA-7493 > URL: https://issues.apache.org/jira/browse/KAFKA-7493 > Project: Kafka > Issue Type: Improvement > Components: streams, system tests >Reporter: John Roesler >Priority: Major > Fix For: 3.1.0 > > > Currently, the test test_broker_type_bounce_at_start in > streams_broker_bounce_test.py is ignored. > As written, there are a couple of race conditions that lead to flakiness. > It should be possible to re-write the test to wait on log messages, as the > other tests do, instead of just sleeping to more deterministically transition > the test from one state to the next. > Once the test is fixed, the fix should be back-ported to all prior branches, > back to 0.10. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax commented on a change in pull request #10953: URL: https://github.com/apache/kafka/pull/10953#discussion_r668344535 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java ## @@ -140,7 +140,7 @@ public static SessionWindows with(final Duration inactivityGap) { final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap"); final long inactivityGapMs = validateMillisecondDuration(inactivityGap, msgPrefix); -return new SessionWindows(inactivityGapMs, DEPRECATED_OLD_24_HR_GRACE_PERIOD); +return new SessionWindows(inactivityGapMs, Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - inactivityGapMs, 0)); Review comment: Is this correct? The old `maintainMs` did use `Math.max(grace, inactivityGap)` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax edited a comment on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax edited a comment on pull request #10953: URL: https://github.com/apache/kafka/pull/10953#issuecomment-878690334 > Otherwise, Streams app that run with 2.8 and before might not be compatible with Streams 3.0 because the retention time of the changelog topics created with older Streams apps will be smaller than the assumed retention time for Streams apps in 3.0. Why would this be fatal? It seems in 3.0 the retention time would potentially become _larger_ (if a new changlog topic is created). And for any existing application, we won't reconfigure the topic config. While I agree that we should keep the behavior in-sync, I don't see how there could be data loss? -- Note that if the old behavior has a shorter retention time, it would be a "bug" if the state store keeps stuff longer now, but it won't be a regression with regard to data loss, as we never guaranteed to keep the data longer than in the old behavior case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax commented on pull request #10953: URL: https://github.com/apache/kafka/pull/10953#issuecomment-878690334 > Otherwise, Streams app that run with 2.8 and before might not be compatible with Streams 3.0 because the retention time of the changelog topics created with older Streams apps will be smaller than the assumed retention time for Streams apps in 3.0. Why would this be fatal? It seems in 3.0 the retention time would potentially become _larger_ (if a new changlog topic is created). And for any existing application, we won't reconfigure the topic config. While I agree that we should keep the behavior in-sync, I don't see how there could be data loss? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379482#comment-17379482 ] Guozhang Wang commented on KAFKA-13008: --- Thanks for the great find [~showuon]! I took a look at the server-side code, and I think we can consider two things: 1) slightly augment the session handling logic so that within a session, if a partition was newly requested (here, we would not try to distinguish whether it was requested for the very first time, or it was re-added after a while), even if the requested position has reached the log end i.e. there's no data to return, we still return in the response to encode the log end information. WDYT [~cmccabe]? 2) since 1) would be a broker change and even if we do that, it may not help all cases for streams, we would still need some remedies. One (somewhat hacky..) idea is to actually pay the round-trip in such cases (only when the config is set to >= 0), but that since fetch request would not do for old versioned brokers, we would use the offset request (either consumer or admin has the API to do that) to get the log end offset. In fact, in Streams when we get the assigned partitions we always need to get the log end offset for changes at first to check if any restoration is needed, we can, just add source topic partitions as well in that phase and expose as the initial values for the main consumer as well. Note this is only done once after every rebalance, and no more. > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partitio
[GitHub] [kafka] mjsax commented on a change in pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax commented on a change in pull request #10953: URL: https://github.com/apache/kafka/pull/10953#discussion_r668344535 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java ## @@ -140,7 +140,7 @@ public static SessionWindows with(final Duration inactivityGap) { final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap"); final long inactivityGapMs = validateMillisecondDuration(inactivityGap, msgPrefix); -return new SessionWindows(inactivityGapMs, DEPRECATED_OLD_24_HR_GRACE_PERIOD); +return new SessionWindows(inactivityGapMs, Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - inactivityGapMs, 0)); Review comment: Is this correct? The old `maintainMs` did use `Math.max(grace, inactivityGap)` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax commented on pull request #10953: URL: https://github.com/apache/kafka/pull/10953#issuecomment-878687476 Correct, sliding-windows were added later and never had a default grace. Also, we deprecated `until() / maintainMs()` before and because sliding-windows do not inherit from `Windows` is never added `until() / maintainMs()` -- thus, I don't think we need to update them. Note that there is an interplay between grace period and retention time in the old API. However, looking into the code of the different classes, it seems only `TimeWindow` needs to be updated, because it implements: ``` public long maintainMs() { return Math.max(maintainDurationMs, sizeMs + gracePeriodMs()); } ``` In contrast, `JoinWindows` used `Math.max(maintainDurationMs, size());` and `SessionWindows` used `Math.max(maintainDurationMs, gapMs);` -- thus, there retention-time should not be affected by any change to grace-period? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10944: MINOR: Loose verification of startup in EOS system tests
mjsax commented on a change in pull request #10944: URL: https://github.com/apache/kafka/pull/10944#discussion_r668337811 ## File path: tests/kafkatest/tests/streams/streams_eos_test.py ## @@ -128,45 +128,61 @@ def run_failure_and_recovery(self, processor1, processor2, processor3, verifier) verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) def add_streams(self, processor): -with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: -processor.start() -self.wait_for_startup(monitor, processor) +with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor: +with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor: +processor.start() +self.wait_for_running(stdout_monitor, processor) +self.wait_for_commit(log_monitor, processor) def add_streams2(self, running_processor, processor_to_be_started): -with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as monitor: -self.add_streams(processor_to_be_started) -self.wait_for_startup(monitor, running_processor) +with running_processor.node.account.monitor_log(running_processor.LOG_FILE) as log_monitor: +with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as stdout_monitor: +self.add_streams(processor_to_be_started) +self.wait_for_running(stdout_monitor, running_processor) +self.wait_for_commit(log_monitor, running_processor) def add_streams3(self, running_processor1, running_processor2, processor_to_be_started): -with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as monitor: -self.add_streams2(running_processor2, processor_to_be_started) -self.wait_for_startup(monitor, running_processor1) +with running_processor1.node.account.monitor_log(running_processor1.LOG_FILE) as log_monitor: +with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as stdout_monitor: +self.add_streams2(running_processor2, processor_to_be_started) +self.wait_for_running(stdout_monitor, running_processor1) +self.wait_for_commit(log_monitor, running_processor1) def stop_streams(self, processor_to_be_stopped): with processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE) as monitor2: processor_to_be_stopped.stop() self.wait_for(monitor2, processor_to_be_stopped, "StateChange: PENDING_SHUTDOWN -> NOT_RUNNING") def stop_streams2(self, keep_alive_processor, processor_to_be_stopped): -with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as monitor: -self.stop_streams(processor_to_be_stopped) -self.wait_for_startup(monitor, keep_alive_processor) +with keep_alive_processor.node.account.monitor_log(keep_alive_processor.LOG_FILE) as log_monitor: +with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as stdout_monitor: +self.stop_streams(processor_to_be_stopped) +self.wait_for_running(stdout_monitor, keep_alive_processor) +self.wait_for_commit(log_monitor, keep_alive_processor) def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_stopped): -with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor: -self.stop_streams2(keep_alive_processor2, processor_to_be_stopped) -self.wait_for_startup(monitor, keep_alive_processor1) +with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor: +with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor: +self.stop_streams2(keep_alive_processor2, processor_to_be_stopped) +self.wait_for_running(stdout_monitor, keep_alive_processor1) +self.wait_for_commit(log_monitor, keep_alive_processor1) def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_aborted): -with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor1: -with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor2: -processor_to_be_aborted.stop_nodes(False) -self.wait_for_startup(monitor2, keep_alive_processor2) -self.wait_for_startup(monitor1, keep_alive_processor1) - -def wait_for_startup(self, monitor, processor): +with keep_alive_processor1.n
[GitHub] [kafka] cmccabe commented on pull request #11031: KAFKA-13067 Add internal config to lower the metadata log segment size
cmccabe commented on pull request #11031: URL: https://github.com/apache/kafka/pull/11031#issuecomment-878677225 Looks like a test needs to be fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI
mjsax commented on a change in pull request #10994: URL: https://github.com/apache/kafka/pull/10994#discussion_r668331269 ## File path: docs/streams/developer-guide/processor-api.html ## @@ -86,12 +86,48 @@ Overviewclose() method. Note that Kafka Streams may re-use a single Processor object by calling init() on it again after close(). -When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors: - (1) If #forward() is called within #process() the output record inherits the input record timestamp. - (2) If #forward() is called within punctuate() the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time). - Note, that #forward() also allows to change the default behavior by passing a custom timestamp for the output record. -Specifically, ProcessorContext#schedule() accepts a user Punctuator callback interface, which triggers its punctuate() -API method periodically based on the PunctuationType. The PunctuationType determines what notion of time is used + +The Processor interface takes two sets of generic parameters: +KIn, VIn, KOut, VOut. These define the input and output types +that the processor implementation can handle. KIn and +VIn define the key and value types that will be passed +to process(). +Likewise, KOut and VOut +define the forwarded key and value types that ProcessorContext#forward() +will accept. If your processor does not forward any records at all (or if it only forwards +null keys or values), +a best practice is to set the output generic type argument to +Void. +If it needs to forward multiple types that don't share a common superclass, you will +have to set the output generic type argument to Object. + + +Both the Processor#process() +and the ProcessorContext#forward() +methods handle precords in the form of the Record+data class. This class gives you access to the key components of a Kafka record: +the key, value, timestamp and headers. When forwarding records, you can use the +constructor to create a new Record +from scratch, or you can use the convenience builder methods to replace one of the +Record's properties Review comment: `properties` -> `fields` (or `elements`) ? ## File path: docs/streams/developer-guide/processor-api.html ## @@ -86,12 +86,48 @@ Overviewclose() method. Note that Kafka Streams may re-use a single Processor object by calling init() on it again after close(). -When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors: - (1) If #forward() is called within #process() the output record inherits the input record timestamp. - (2) If #forward() is called within punctuate() the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time). - Note, that #forward() also allows to change the default behavior by passing a custom timestamp for the output record. -Specifically, ProcessorContext#schedule() accepts a user Punctuator callback interface, which triggers its punctuate() -API method periodically based on the PunctuationType. The PunctuationType determines what notion of time is used + +The Processor interface takes two sets of generic parameters: +KIn, VIn, KOut, VOut. These define the input and output types +that the processor implementation can handle. KIn and +VIn define the key and value types that will be passed +to process(). +Likewise, KOut and VOut +define the forwarded key and value types that ProcessorContext#forward() +will accept. If your processor does not forward any records at all (or if it only forwards +null keys or values), +a best practice is to set the output generic type argument to +Void. +If it needs to forward multiple types that don't share a common superclass, you will +have to set the output generic type argument to Object. + + +Both the Processor#process() +and the ProcessorContext#forward() +methods handle precords in the form of the Record +data class. This class gives you access to the key components of a Kafka record: Review comment: `key` -> `main` (to avoid confusion
[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379462#comment-17379462 ] A. Sophie Blee-Goldman commented on KAFKA-13008: {quote}Re-reading KIP-227, it seems like there should be a way for the client to add re-acquired partitions like this to the incremental fetch request so that it can reinitialize its metadata cache. In other words, it seems like getting a partition assigned that you haven't owned for a while is effectively the same case as getting a partition that you've never owned, and there does seem to be a mechanism for the latter. {quote} Thanks John, that is exactly what I was trying to suggest above, but I may have mungled it with my lack of understanding of the incremental fetch design. Given how long this bug went unnoticed and the in-depth investigation it took to uncover the bug (again, nicely done [~showuon]), it seems like any user of the plain consumer client in addition to Streams could be easily tripped up by this. And just personally, I had to read the analysis twice to really understand what was going on, since the behavior was/is so unintuitive to me. > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > pa
[GitHub] [kafka] ableegoldman commented on a change in pull request #11009: MINOR: update doc for default assignor change
ableegoldman commented on a change in pull request #11009: URL: https://github.com/apache/kafka/pull/11009#discussion_r668314609 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ## @@ -114,15 +114,17 @@ "ordered by preference, of supported partition assignment strategies that the client will use to distribute " + "partition ownership amongst consumer instances when group management is used. Available options are:" + "" + -"org.apache.kafka.clients.consumer.RangeAssignor: The default assignor, which works on a per-topic basis." + +"org.apache.kafka.clients.consumer.RangeAssignor: Assigns partitions on a per-topic basis." + "org.apache.kafka.clients.consumer.RoundRobinAssignor: Assigns partitions to consumers in a round-robin fashion." + "org.apache.kafka.clients.consumer.StickyAssignor: Guarantees an assignment that is " + "maximally balanced while preserving as many existing partition assignments as possible." + "org.apache.kafka.clients.consumer.CooperativeStickyAssignor: Follows the same StickyAssignor " + "logic, but allows for cooperative rebalancing." + "" + +"The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use RangeAssignor to do assignment, " + +"but just needs 1 rolling bounce to upgrade to CooperativeStickyAssignor" + Review comment: ```suggestion "The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, " + "but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list" + ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r668302886 ## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ## @@ -246,6 +265,19 @@ public boolean isTransactional() { return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0; } +@Override +public boolean hasDeleteHorizonMs() { +return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0; +} + +@Override +public long deleteHorizonMs() { Review comment: I've removed the `hasDeleteHorizonMs` function from the interface. I've made it so the DefaultRecordBatch still has the function as a private helper, but the logic overall is pretty much unchanged I've also made the change to return OptionalLong for `deleteHorizonMs`, trying to keep the overall logic the same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r668302886 ## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ## @@ -246,6 +265,19 @@ public boolean isTransactional() { return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0; } +@Override +public boolean hasDeleteHorizonMs() { +return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0; +} + +@Override +public long deleteHorizonMs() { Review comment: I've removed the `hasDeleteHorizonMs` function from the interface. I've made it so the DefaultRecordBatch still has the function as a private helper, but the logic overall is pretty much unchanged I've also made the change to return OptionalLong for `deleteHorizonMs` to see how it looks. I think that the current usage in the cases for which this value returns `OptionalLong.Empty` is to just use `RecordBatch.NO_TIMESTAMP`, so I am thinking it could be cleaner to keep it as a `long` and contain the logic for returning the deleteHorizonMs value or `RecordBatch.NO_TIMESTAMP` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r668302886 ## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ## @@ -246,6 +265,19 @@ public boolean isTransactional() { return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0; } +@Override +public boolean hasDeleteHorizonMs() { +return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0; +} + +@Override +public long deleteHorizonMs() { Review comment: I've removed the `hasDeleteHorizonMs` function from the interface. I've made it so the DefaultRecordBatch still has the function as a private helper, but the logic overall is pretty much unchanged I've also made the change to return OptionalLong for `deleteHorizonMs` to see how it looks. I think that the current usage in the cases for which this value returns `OptionalLong.Empty` is to just use `RecordBatch.NO_TIMESTAMP`, so I am thinking it could be cleaner to keep it as a `long` and contain the logic for the deleteHorizonMs value or `RecordBatch.NO_TIMESTAMP` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r668304063 ## File path: checkstyle/suppressions.xml ## @@ -57,7 +57,7 @@ Review comment: the same suppression is on L54 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r668303344 ## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ## @@ -167,21 +167,9 @@ public void ensureValid() { * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty */ public long baseTimestamp() { -return buffer.getLong(FIRST_TIMESTAMP_OFFSET); -} - -/** - * Get the timestamp of the first record in this batch. It is usually the create time of the record even if the - * timestamp type of the batch is log append time. - * - * @return The first timestamp if a record has been appended, unless the delete horizon has been set - * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty or if the delete horizon is set - */ -public long firstTimestamp() { -final long baseTimestamp = baseTimestamp(); if (hasDeleteHorizonMs()) return RecordBatch.NO_TIMESTAMP; Review comment: yes you are right. sorry for the mistake. It should be fixed now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r668302886 ## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ## @@ -246,6 +265,19 @@ public boolean isTransactional() { return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0; } +@Override +public boolean hasDeleteHorizonMs() { +return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0; +} + +@Override +public long deleteHorizonMs() { Review comment: I've removed the `hasDeleteHorizonMs` function from the interface. I've made it so the DefaultRecordBatch still has the function as a private helper, but the logic overall is pretty much unchanged I've also made the change to return OptionalLong for `deleteHorizonMs` to see how it looks. I think that the current usage in the cases for which this value returns `OptionalLong.Empty` is to just use `RecordBatch.NO_TIMESTAMP`, so I am thinking it's just cleaner to keep it as a `long` and contain the logic for the deleteHorizonMs value or `RecordBatch.NO_TIMESTAMP` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r668300841 ## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ## @@ -156,13 +161,27 @@ public void ensureValid() { } /** - * Get the timestamp of the first record in this batch. It is always the create time of the record even if the + * Gets the base timestamp of the batch which is used to calculate the timestamp deltas. + * + * @return The base timestamp or + * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty + */ +public long baseTimestamp() { Review comment: I've updated RecordIterator. I also expanded on `MemoryRecordsTest.testBaseTimestampToDeleteHorizonConversion` to check the record timestamp to verify it is the correct value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #10280: KAFKA-12554: Refactor Log layer
kowshik commented on pull request #10280: URL: https://github.com/apache/kafka/pull/10280#issuecomment-878641126 @junrao Thanks for the review. I ran load tests on the changes from this PR, there weren't any new regressions (i.e. latency regressions or errors) that I noticed, except for an issue that I found which looks unrelated to this PR, its described in this jira: https://issues.apache.org/jira/browse/KAFKA-13070. The load test was run on a 6-broker cluster with 250GB SSD disks: * Produce consume on a test topic 2000 partitions (~1000+ replica count per broker). * Per topic # of producers = 6. * Produce ingress per broker = ~20.5MBps. * Per topic # of consumers = 6. * \# of consumer groups = 3. * Test duration: ~1h. Mid-way through the test, I rolled the cluster under load to check how the cluster behaved. Overall things looked OK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13070) LogManager shutdown races with periodic work scheduled by the instance
Kowshik Prakasam created KAFKA-13070: Summary: LogManager shutdown races with periodic work scheduled by the instance Key: KAFKA-13070 URL: https://issues.apache.org/jira/browse/KAFKA-13070 Project: Kafka Issue Type: Bug Reporter: Kowshik Prakasam In the LogManager shutdown sequence (in LogManager.shutdown()), we don't cancel the periodic work scheduled by it prior to shutdown. As a result, the periodic work could race with the shutdown sequence causing some unwanted side effects. This is reproducible by a unit test in LogManagerTest. ``` // set val maxLogAgeMs = 6 in the test @Test def testRetentionPeriodicWorkAfterShutdown(): Unit = { val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None) val logFile = new File(logDir, name + "-0") assertTrue(logFile.exists) log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0) log.updateHighWatermark(log.logEndOffset) logManager.shutdown() assertTrue(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath)) time.sleep(maxLogAgeMs + logManager.InitialTaskDelayMs + logManager.retentionCheckMs + 1) logManager = null } ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
guozhangwang commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668293781 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -186,16 +209,25 @@ private boolean allSubscriptionsEqual(Set allTopics, consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; List assignedPartitions = new ArrayList<>(); -// Reassign previously owned partitions to the expected number +// Reassign previously owned partitions, up to the expected number of partitions per consumer for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); +for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) { +if (ownedPartitions.contains(doublyClaimedPartition)) { +log.warn("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple " Review comment: Same here: in our current code this should never happen, so what about log as ERROR? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set allTopics, allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than -// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers +// the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { -unfilledMembers.add(consumer); +potentiallyUnfilledMembersAtMinQuota.add(consumer); Review comment: Honestly it took me quite a while to understand the fix :P After understanding that I think maybe it's better to rename these two collections more explicitly: 1) `unfilledMembers` -> `MembersWithLessThanMinQuotaPartitions`. 2) `potentiallyUnfilledMembersAtMinQuota` -> `MembersWithExactMinQuotaPartitions`. And also (since the maxQuota is always either == minQuota or minQuota + 1): 3) `expectedNumMembersAssignedOverMinQuota` -> `expectedNumMembersWithMaxQuota` 4) `numMembersAssignedOverMinQuota` -> `numMembersWithMaxQuota` ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java ## @@ -555,7 +558,7 @@ public void testLargeAssignmentAndGroupWithUniformSubscription() { assignor.assign(partitionsPerTopic, subscriptions); } -@Timeout(40) +@Timeout(60) Review comment: +1 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -130,19 +146,26 @@ private boolean allSubscriptionsEqual(Set allTopics, for (final TopicPartition tp : memberData.partitions) { // filter out any topics that no longer exist or aren't part of the current subscription if (allTopics.contains(tp.topic())) { -ownedPartitions.add(tp); + +if (!allPreviousPartitionsToOwner.containsKey(tp)) { +allPreviousPartitionsToOwner.put(tp, consumer); +ownedPartitions.add(tp); +} else { +String otherConsumer = allPreviousPartitionsToOwner.get(tp); +log.warn("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " Review comment: nit: do you think we should log at ERROR since this is not expected really? Right now we would sort of "hide" such bugs and still be able to proceed silently; I feel we should shouting out such scenarios a bit louder in logs. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set allTopics, // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected members // with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partit
[GitHub] [kafka] dielhennr commented on a change in pull request #11011: KAFKA-13051: Require Principal Serde and add default
dielhennr commented on a change in pull request #11011: URL: https://github.com/apache/kafka/pull/11011#discussion_r668286722 ## File path: clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalSerde.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.authenticator; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.message.DefaultPrincipalData; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.KafkaPrincipalSerde; + +import java.nio.ByteBuffer; + +public interface DefaultKafkaPrincipalSerde extends KafkaPrincipalSerde { Review comment: @hachikuji Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13069) Add magic number to DefaultKafkaPrincipalBuilder.KafkaPrincipalSerde
Ron Dagostino created KAFKA-13069: - Summary: Add magic number to DefaultKafkaPrincipalBuilder.KafkaPrincipalSerde Key: KAFKA-13069 URL: https://issues.apache.org/jira/browse/KAFKA-13069 Project: Kafka Issue Type: Bug Affects Versions: 2.8.0, 3.0.0 Reporter: Ron Dagostino Assignee: Ron Dagostino Fix For: 3.0.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] d8tltanc commented on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings
d8tltanc commented on pull request #11002: URL: https://github.com/apache/kafka/pull/11002#issuecomment-878622218 @ijuma @rajinisivaram Please let me know if we are good to merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings
d8tltanc commented on pull request #11002: URL: https://github.com/apache/kafka/pull/11002#issuecomment-878621846 Failed tests: Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testAbortTransactionTimeout() Build / JDK 16 and Scala 2.13 / kafka.api.TransactionsTest.testSendOffsetsToTransactionTimeout() Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance() My Local runs: > Task :core:test ... TransactionsTest > testSendOffsetsToTransactionTimeout() PASSED TransactionsTest > testAbortTransactionTimeout() PASSED ... ConsumerBounceTest > testCloseDuringRebalance() FAILED org.opentest4j.AssertionFailedError: Rebalance did not complete in time ==> expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at kafka.api.ConsumerBounceTest.waitForRebalance$1(ConsumerBounceTest.scala:400) at kafka.api.ConsumerBounceTest.checkCloseDuringRebalance(ConsumerBounceTest.scala:414) at kafka.api.ConsumerBounceTest.testCloseDuringRebalance(ConsumerBounceTest.scala:381) But testCloseDuringRebalance() seems irrlevant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10986: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group
hachikuji commented on pull request #10986: URL: https://github.com/apache/kafka/pull/10986#issuecomment-878621043 To clarify, from the perspective of the eager protocol, how would this case look? Would we get multiple calls to `onPartitionsRevoked` with the same set of partitions or something else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #11030: MINOR: Unmarking raft quorum configs as internal
cmccabe merged pull request #11030: URL: https://github.com/apache/kafka/pull/11030 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10091) Improve task idling
[ https://issues.apache.org/jira/browse/KAFKA-10091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10091. -- Resolution: Fixed > Improve task idling > --- > > Key: KAFKA-10091 > URL: https://issues.apache.org/jira/browse/KAFKA-10091 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > When Streams is processing a task with multiple inputs, each time it is ready > to process a record, it has to choose which input to process next. It always > takes from the input for which the next record has the least timestamp. The > result of this is that Streams processes data in timestamp order. However, if > the buffer for one of the inputs is empty, Streams doesn't know what > timestamp the next record for that input will be. > Streams introduced a configuration "max.task.idle.ms" in KIP-353 to address > this issue. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization] > The config allows Streams to wait some amount of time for data to arrive on > the empty input, so that it can make a timestamp-ordered decision about which > input to pull from next. > However, this config can be hard to use reliably and efficiently, since what > we're really waiting for is the next poll that _would_ return data from the > empty input's partition, and this guarantee is a function of the poll > interval, the max poll interval, and the internal logic that governs when > Streams will poll again. > The ideal case is you'd be able to guarantee at a minimum that _any_ amount > of idling would guarantee you poll data from the empty partition if there's > data to fetch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12360) Improve documentation of max.task.idle.ms (kafka-streams)
[ https://issues.apache.org/jira/browse/KAFKA-12360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-12360. -- Resolution: Fixed > Improve documentation of max.task.idle.ms (kafka-streams) > - > > Key: KAFKA-12360 > URL: https://issues.apache.org/jira/browse/KAFKA-12360 > Project: Kafka > Issue Type: Sub-task > Components: docs, streams >Reporter: Domenico Delle Side >Assignee: John Roesler >Priority: Minor > Labels: beginner, newbie, trivial > > _max.task.idle.ms_ is an handy way to pause processing in a *_kafka-streams_* > application. This is very useful when you need to join two topics that are > out of sync, i.e when data in a topic may be produced _before_ you receive > join information in the other topic. > In the documentation, however, it is not specified that the value of > _max.task.idle.ms_ *must* be lower than _max.poll.intervall.ms_, otherwise > you'll incur into an endless rebalancing problem. > I think it is better to clearly state this in the documentation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12977) Eliminate temporary ProducerStateManager in Log recovery logic
[ https://issues.apache.org/jira/browse/KAFKA-12977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12977: - Parent: KAFKA-12551 Issue Type: Sub-task (was: Improvement) > Eliminate temporary ProducerStateManager in Log recovery logic > -- > > Key: KAFKA-12977 > URL: https://issues.apache.org/jira/browse/KAFKA-12977 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > The temporary ProducerStateManager (PSM) instance created in the Log recovery > logic (inside LogLoader) is a source of complexity and confusion. For > example, when fixing KAFKA-12964 (see [PR# > 10896|https://github.com/apache/kafka/pull/10896]) we figured that there are > cases where the temporary PSM instance's state goes out of sync with the real > PSM instance (within LoadLogParams). And we need to adjust the code suitably > to handle for the consequences of these 2 instances being out of sync. To fix > this, we should just get rid of the temporary PSM instance which is used in > the following places: > # In LogLoader.recoverLog(), we could just pass in the real PSM. > # In LogLoader.completeSwapOperations(), we try to avoid recovering segment > here in [PR #10763|https://github.com/apache/kafka/pull/10763]. > # In LogLoader.loadSegmentFiles(), we probably need to clean this part of > the logic a bit. If we are missing index file or the index file is corrupted, > typically we can just rebuild the index without changing PSM. If the segment > is truncated while rebuilding the index, we actually want to follow the > process in step 1, by just removing the rest of the segments. So, we could > also get rid of the temporary PSM in this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #11003: KAFKA-12360: Document new time semantics
vvcephei commented on pull request #11003: URL: https://github.com/apache/kafka/pull/11003#issuecomment-878602756 Merged and cherry-picked to 3.0 (cc @kkonstantine ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13068) Rename Log to UnifiedLog
Kowshik Prakasam created KAFKA-13068: Summary: Rename Log to UnifiedLog Key: KAFKA-13068 URL: https://issues.apache.org/jira/browse/KAFKA-13068 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam Assignee: Kowshik Prakasam Once KAFKA-12554 is completed, we can rename Log -> UnifiedLog as described in the doc: [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12554) Split Log layer into Log and LocalLog
[ https://issues.apache.org/jira/browse/KAFKA-12554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12554: - Description: Split Log layer into Log and LocalLog based on the proposal described in this document: [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#]. (was: Split Log layer into UnifiedLog and LocalLog based on the proposal described in this document: https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#.) > Split Log layer into Log and LocalLog > - > > Key: KAFKA-12554 > URL: https://issues.apache.org/jira/browse/KAFKA-12554 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > Split Log layer into Log and LocalLog based on the proposal described in this > document: > [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12554) Split Log layer into Log and LocalLog
[ https://issues.apache.org/jira/browse/KAFKA-12554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-12554: - Summary: Split Log layer into Log and LocalLog (was: Split Log layer into UnifiedLog and LocalLog) > Split Log layer into Log and LocalLog > - > > Key: KAFKA-12554 > URL: https://issues.apache.org/jira/browse/KAFKA-12554 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > Split Log layer into UnifiedLog and LocalLog based on the proposal described > in this document: > https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei merged pull request #11003: KAFKA-12360: Document new time semantics
vvcephei merged pull request #11003: URL: https://github.com/apache/kafka/pull/11003 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #11003: KAFKA-12360: Document new time semantics
vvcephei commented on pull request #11003: URL: https://github.com/apache/kafka/pull/11003#issuecomment-878599816 Thanks, @JimGalasyn , @showuon , and @abbccdda ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11011: KAFKA-13051: Require Principal Serde and add default
hachikuji commented on a change in pull request #11011: URL: https://github.com/apache/kafka/pull/11011#discussion_r668258168 ## File path: clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalSerde.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.authenticator; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.message.DefaultPrincipalData; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.KafkaPrincipalSerde; + +import java.nio.ByteBuffer; + +public interface DefaultKafkaPrincipalSerde extends KafkaPrincipalSerde { Review comment: Fair enough. @dielhennr Can we move this back to where it was? I guess I don't see a problem letting the test principal builder implementations extend `DefaultKafkaPrincipalBuilder` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11011: KAFKA-13051: Require Principal Serde and add default
hachikuji commented on a change in pull request #11011: URL: https://github.com/apache/kafka/pull/11011#discussion_r668258168 ## File path: clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalSerde.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.authenticator; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.message.DefaultPrincipalData; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.KafkaPrincipalSerde; + +import java.nio.ByteBuffer; + +public interface DefaultKafkaPrincipalSerde extends KafkaPrincipalSerde { Review comment: Fair enough. @dielhennr Can we move this back to where it was? I guess I don't see a problem letting the test principal builder implementations extend `KafkaPrincipalBuilder` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13040) Increase minimum value of segment.ms and segment.bytes
[ https://issues.apache.org/jira/browse/KAFKA-13040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Badai Aqrandista resolved KAFKA-13040. -- Resolution: Duplicate > Increase minimum value of segment.ms and segment.bytes > -- > > Key: KAFKA-13040 > URL: https://issues.apache.org/jira/browse/KAFKA-13040 > Project: Kafka > Issue Type: Improvement >Reporter: Badai Aqrandista >Assignee: Badai Aqrandista >Priority: Minor > > Raised for KIP-760 (linked). > Many times, Kafka brokers in production crash with "Too many open files" > error or "Out of memory" errors because some Kafka topics have a lot of > segment files as a result of small {{segment.ms}} or {{segment.bytes}}. These > two configuration can be set by any user who is authorized to create topic or > modify topic configuration. > To prevent these two configuration from causing Kafka broker crash, they > should have a minimum value that is big enough. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-7760) Add broker configuration to set minimum value for segment.bytes and segment.ms
[ https://issues.apache.org/jira/browse/KAFKA-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Badai Aqrandista reopened KAFKA-7760: - Reopening issue and making this the main ticket for KIP-760 > Add broker configuration to set minimum value for segment.bytes and segment.ms > -- > > Key: KAFKA-7760 > URL: https://issues.apache.org/jira/browse/KAFKA-7760 > Project: Kafka > Issue Type: Improvement >Reporter: Badai Aqrandista >Assignee: Badai Aqrandista >Priority: Major > Labels: kip, newbie > > If someone set segment.bytes or segment.ms at topic level to a very small > value (e.g. segment.bytes=1000 or segment.ms=1000), Kafka will generate a > very high number of segment files. This can bring down the whole broker due > to hitting the maximum open file (for log) or maximum number of mmap-ed file > (for index). > To prevent that from happening, I would like to suggest adding two new items > to the broker configuration: > * min.topic.segment.bytes, defaults to 1048576: The minimum value for > segment.bytes. When someone sets topic configuration segment.bytes to a value > lower than this, Kafka throws an error INVALID VALUE. > * min.topic.segment.ms, defaults to 360: The minimum value for > segment.ms. When someone sets topic configuration segment.ms to a value lower > than this, Kafka throws an error INVALID VALUE. > Thanks > Badai -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #10986: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group
guozhangwang commented on pull request #10986: URL: https://github.com/apache/kafka/pull/10986#issuecomment-878589565 @hachikuji I think the key idea behind this fix is that, if a rebalance failed with e.g. memberId lost, then conceptually we would just started a new rebalance in which we would call `onJoinPrepare` and in which we may call `onRepartitionsRevoked` again. This behavior would be the same for eager or cooperative. Personally I think this fix is fine -- @ableegoldman if you could just add a unit test for the case of memberId lost during a first rebalance, and check that we would re-triggered `onJoinPrepare` again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #11031: KAFKA-13067 Add internal config to lower the metadata log segment size
mumrah opened a new pull request #11031: URL: https://github.com/apache/kafka/pull/11031 In order to facilitate system and integration tests that use a smaller log segment size, we are adding this internal config to lower the minimum. During normal operation, this config will use the default size of 8Mb (as defined by KafkaRaftClient). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13003) KafkaBroker advertises socket port instead of the configured advertised port
[ https://issues.apache.org/jira/browse/KAFKA-13003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Uwe Eisele resolved KAFKA-13003. Resolution: Fixed Pull Request #10935 has been merged. > KafkaBroker advertises socket port instead of the configured advertised port > > > Key: KAFKA-13003 > URL: https://issues.apache.org/jira/browse/KAFKA-13003 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: Uwe Eisele >Assignee: Uwe Eisele >Priority: Critical > Labels: kip-500 > Fix For: 3.0.0 > > > In Kraft mode Apache Kafka 2.8.0 does advertise the socket port instead of > the configured advertised port. > A broker given with the following configuration > {code:java} > listeners=PUBLIC://0.0.0.0:19092,REPLICATION://0.0.0.0:9091 > advertised.listeners=PUBLIC://envoy-kafka-broker:9091,REPLICATION://kafka-broker1:9091 > {code} > advertises on the _PUBLIC_ listener _envoy-kafka-broker:19092_, however I > would expect that _envoy-kafka-broker:9091_ is advertised. In ZooKeeper mode > it works as expected. > In a deployment with a L4 proxy in front of the Kafka cluster, it is > important, that the advertised port can be different from the actual socket > port. > I tested it with a Docker-Compose setup which runs 3 Kafka Broker in Kraft > mode and an Envoy proxy in front of them. With Apache Kafka 2.8.0 it does not > work, because Kafka does not advertise the configured advertised port. For > more details see: > https://github.com/ueisele/kafka/tree/fix/kraft-advertisedlisteners-build/proxy-examples/proxyl4-kafkakraft-bug-2.8 > _Client -- 909[1-3] --> Envoy Proxy -- 19092 --> Kafka Broker [1-3]_ > || Envoy Host || Envoy Port || Kafka Broker || Kafka Port || Advertised > Listener || > | envoy-kafka-broker | 9091 | kafka-broker1 | 19092 | envoy-kafka-broker:9091 > | > | envoy-kafka-broker | 9092 | kafka-broker2 | 19092 | envoy-kafka-broker:9092 > | > | envoy-kafka-broker | 9093 | kafka-broker3 | 19092 | envoy-kafka-broker:9093 > | > {code:bash} > > docker-compose exec kafkacat kafkacat -b envoy-kafka-broker:9091 -L > Metadata for all topics (from broker -1: envoy-kafka-broker:9091/bootstrap): > 3 brokers: > broker 101 at envoy-kafka-broker:19092 > broker 102 at envoy-kafka-broker:19092 (controller) > broker 103 at envoy-kafka-broker:19092 > 0 topics: > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe merged pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port
cmccabe merged pull request #10935: URL: https://github.com/apache/kafka/pull/10935 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12886) Enable request forwarding by default in 3.1
[ https://issues.apache.org/jira/browse/KAFKA-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-12886: -- Fix Version/s: (was: 3.0.0) 3.1.0 > Enable request forwarding by default in 3.1 > --- > > Key: KAFKA-12886 > URL: https://issues.apache.org/jira/browse/KAFKA-12886 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Ryan Dielhenn >Priority: Major > Fix For: 3.1.0 > > > KIP-590 documents that request forwarding will be enabled in 3.0 by default: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller. > This makes it a requirement for users with custom principal implementations > to provide a `KafkaPrincipalSerde` implementation. We waited until 3.0 > because we saw this as a compatibility break. > The KIP documents that use of forwarding will be controlled by the IBP. So > once the IBP has been configured to 3.0 or above, then the brokers will begin > forwarding. > (Note that forwarding has always been a requirement for kraft.) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12886) Enable request forwarding by default in 3.1
[ https://issues.apache.org/jira/browse/KAFKA-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379403#comment-17379403 ] Ryan Dielhenn commented on KAFKA-12886: --- [~kkonstantine] targeting 3.1 now > Enable request forwarding by default in 3.1 > --- > > Key: KAFKA-12886 > URL: https://issues.apache.org/jira/browse/KAFKA-12886 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Ryan Dielhenn >Priority: Major > Fix For: 3.0.0 > > > KIP-590 documents that request forwarding will be enabled in 3.0 by default: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller. > This makes it a requirement for users with custom principal implementations > to provide a `KafkaPrincipalSerde` implementation. We waited until 3.0 > because we saw this as a compatibility break. > The KIP documents that use of forwarding will be controlled by the IBP. So > once the IBP has been configured to 3.0 or above, then the brokers will begin > forwarding. > (Note that forwarding has always been a requirement for kraft.) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12886) Enable request forwarding by default in 3.1
[ https://issues.apache.org/jira/browse/KAFKA-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-12886: -- Summary: Enable request forwarding by default in 3.1 (was: Enable request forwarding by default in 3.0) > Enable request forwarding by default in 3.1 > --- > > Key: KAFKA-12886 > URL: https://issues.apache.org/jira/browse/KAFKA-12886 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Ryan Dielhenn >Priority: Major > Fix For: 3.0.0 > > > KIP-590 documents that request forwarding will be enabled in 3.0 by default: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller. > This makes it a requirement for users with custom principal implementations > to provide a `KafkaPrincipalSerde` implementation. We waited until 3.0 > because we saw this as a compatibility break. > The KIP documents that use of forwarding will be controlled by the IBP. So > once the IBP has been configured to 3.0 or above, then the brokers will begin > forwarding. > (Note that forwarding has always been a requirement for kraft.) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dielhennr opened a new pull request #11030: MINOR: Unmarking raft quorum configs as internal
dielhennr opened a new pull request #11030: URL: https://github.com/apache/kafka/pull/11030 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests
showuon opened a new pull request #11026: URL: https://github.com/apache/kafka/pull/11026 Some issues found in the ListConsumerGroupOffsetsHandler: 1. if coordinator errors is put in the topic partition, we don't do retry 2. Didn't handle possible partition level exception This is the old handle response logic. FYR: ```java void handleResponse(AbstractResponse abstractResponse) { final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse; final Map groupOffsetsListing = new HashMap<>(); // If coordinator changed since we fetched it, retry if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) { Call call = getListConsumerGroupOffsetsCall(context); rescheduleFindCoordinatorTask(context, () -> call, this); return; } if (handleGroupRequestError(response.error(), context.future())) return; for (Map.Entry entry : response.responseData().entrySet()) { final TopicPartition topicPartition = entry.getKey(); OffsetFetchResponse.PartitionData partitionData = entry.getValue(); final Errors error = partitionData.error; if (error == Errors.NONE) { final Long offset = partitionData.offset; final String metadata = partitionData.metadata; final Optional leaderEpoch = partitionData.leaderEpoch; // Negative offset indicates that the group has no committed offset for this partition if (offset < 0) { groupOffsetsListing.put(topicPartition, null); } else { groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); } } else { log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); } } context.future().complete(groupOffsetsListing); } ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org