[jira] [Updated] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
[ https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13128: --- Fix Version/s: (was: 2.8.1) (was: 3.0.0) > Flaky Test > StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread > > > Key: KAFKA-13128 > URL: https://issues.apache.org/jira/browse/KAFKA-13128 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0, 2.8.1 >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Labels: flaky-test > > h3. Stacktrace > java.lang.AssertionError: Expected: is not null but: was null > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
[ https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reopened KAFKA-13128: Assignee: (was: A. Sophie Blee-Goldman) Failed again for a different reason – just flaky, seems we need to wait for the thread to fully start up {{java.lang.AssertionError: Unexpected exception thrown while getting the value from store. Expected: is (a string containing "Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING" or a string containing "The state store, source-table, may have migrated to another instance") but: was "Cannot get state store source-table because the stream thread is STARTING, not RUNNING"}} > Flaky Test > StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread > > > Key: KAFKA-13128 > URL: https://issues.apache.org/jira/browse/KAFKA-13128 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0, 2.8.1 >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Labels: flaky-test > Fix For: 3.0.0, 2.8.1 > > > h3. Stacktrace > java.lang.AssertionError: Expected: is not null but: was null > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters
dielhennr commented on a change in pull request #11179: URL: https://github.com/apache/kafka/pull/11179#discussion_r683960767 ## File path: core/src/main/scala/kafka/raft/RaftManager.scala ## @@ -214,6 +220,12 @@ class KafkaRaftManager[T]( KafkaRaftManager.createLogDirectory(new File(config.metadataLogDir), logDirName) } + // visible for testing cleanup + private[raft] def deleteDataDir(): Unit = { +val logDirName = Log.logDirName(topicPartition) +KafkaRaftManager.deleteLogDirectory(new File(config.metadataLogDir), logDirName) Review comment: @jsancio What do you think about exposing this api to the package for cleaning up log directories in tests? I'm not sure exactly how to override `createDataDir` with `tempDir` from the tests. It is called internally in `KafkaRaftManager` without an argument. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on pull request #10788: URL: https://github.com/apache/kafka/pull/10788#issuecomment-893993452 LGTM! Please feel free to merge after green builds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683938654 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java ## @@ -67,13 +72,31 @@ public class NamedTopologyIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); +// TODO KAFKA-12648: Review comment: SG. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683938587 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; -private final SortedMap builders; // Keep sorted by topology name for readability +private final TopologyVersion version; + +private final ConcurrentNavigableMap builders; // Keep sorted by topology name for readability Review comment: > I was referring to moving the `builders` map to the `TopologyVersion` in the above, ie I want to save that for Pt. 4 if that's ok Yup, totally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11175: MINOR: Fix getting started documentation
showuon commented on a change in pull request #11175: URL: https://github.com/apache/kafka/pull/11175#discussion_r683905633 ## File path: config/kraft/README.md ## @@ -26,7 +27,7 @@ xtzWWN4bTjitpL3kfd9s5g ## Format Storage Directories -The next step is to format your storage directories. If you are running in single-node mode, you can do this with one command: +The next step is to format your storage directories. If you are running in single-node mode, you can do this with one command. The string `` should be replaced with the value returned by `kafka-storage.sh random-uuid`: $ ./bin/kafka-storage.sh format -t -c ./config/kraft/server.properties Review comment: How about we put the last sentence as the command line comment. That is: # The string `` should be replaced with the value returned by `kafka-storage.sh random-uuid` $ ./bin/kafka-storage.sh format -t -c ./config/kraft/server.properties Formatting /tmp/kraft-combined-logs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683902306 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -517,9 +526,9 @@ void handleRevocation(final Collection revokedPartitions) { } if (!remainingRevokedPartitions.isEmpty()) { -log.warn("The following partitions {} are missing from the task partitions. It could potentially " + +log.debug("The following revoked partitions {} are missing from the current task partitions. It could potentially " + Review comment: Making this debug since `warn` seems too intense, and I'm not sure it's even worthy of `info` -- also, with named topologies you would expect to see this almost every time a topology is removed since the thread will try to close those tasks as soon as it notices the topology's removal -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683902306 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -517,9 +526,9 @@ void handleRevocation(final Collection revokedPartitions) { } if (!remainingRevokedPartitions.isEmpty()) { -log.warn("The following partitions {} are missing from the task partitions. It could potentially " + +log.debug("The following revoked partitions {} are missing from the current task partitions. It could potentially " + Review comment: Making this debug since `warn` seems too intense, and I'm not sure it's even worthy of `info` -- also, with named topologies you would expect to see this almost every time a topology is removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0
showuon commented on a change in pull request #11184: URL: https://github.com/apache/kafka/pull/11184#discussion_r683900989 ## File path: docs/streams/upgrade-guide.html ## @@ -52,6 +52,15 @@ Upgrade Guide and API Changes restart all new ({{fullDotVersion}}) application instances + +Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which bumped its footer version persisted on files. +This means that old versioned RocksDB would not be able to recognize the bytes written by that newer versioned RocksDB, Review comment: nit: Should we put the summary of this paragraph in the first sentence? As you can see, this is the 1st sentence of the above paragraph: `Upgrading from any older version to 2.8.0 is possible: `, which is clear that we are introducing upgrading things. And in this paragraph, the main point we want to deliver is "Downgrading from 3.0.0 to older versions, additional works needed to be done". So, we might be able to put this main point at the front, and then explain why it needs this work, to make it clear. What do you think? (BTW, I'm OK with current version :) ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683901855 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -775,26 +806,16 @@ private void completeTaskCloseClean(final Task task) { void shutdown(final boolean clean) { final AtomicReference firstException = new AtomicReference<>(null); -final Set tasksToCloseDirty = new HashSet<>(); Review comment: No actual changes here, just pulled the cleanup of tasks out into a separate new `#closeAndCleanUpTasks` method so we can call that on tasks from removed topologies -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters
dielhennr commented on a change in pull request #11179: URL: https://github.com/apache/kafka/pull/11179#discussion_r683894514 ## File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ## @@ -262,6 +262,8 @@ class KafkaConfigTest { props.put(KafkaConfig.ProcessRolesProp, "controller") props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092") props.put(KafkaConfig.NodeIdProp, "1") +props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092") Review comment: `props.put` ## File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala ## @@ -96,6 +96,7 @@ class ControllerApisTest { props.put(KafkaConfig.NodeIdProp, nodeId: java.lang.Integer) props.put(KafkaConfig.ProcessRolesProp, "controller") props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT") +props.put(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093") Review comment: remove brackets from `nodeId` string interpolation ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -132,6 +132,8 @@ class KafkaApisTest { val properties = TestUtils.createBrokerConfig(brokerId, "") properties.put(KafkaConfig.NodeIdProp, brokerId.toString) properties.put(KafkaConfig.ProcessRolesProp, "broker") + val voterId = (brokerId + 1) + properties.setProperty(KafkaConfig.QuorumVotersProp, s"$voterId@localhost:9093") Review comment: `props.put` ## File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ## @@ -1091,6 +1093,7 @@ class KafkaConfigTest { val largeBrokerId = 2000 val props = new Properties() props.put(KafkaConfig.ProcessRolesProp, "broker") +props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093") Review comment: Use `props.put` to stay consistent with method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on pull request #11182: KAFKA-13074: Implement mayClean for MockLog
zhaohaidao commented on pull request #11182: URL: https://github.com/apache/kafka/pull/11182#issuecomment-893935065 @jsancio Could you please review this pr if you have time, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters
dielhennr commented on a change in pull request #11179: URL: https://github.com/apache/kafka/pull/11179#discussion_r683880423 ## File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala ## @@ -34,21 +34,30 @@ import org.mockito.Mockito._ class RaftManagerTest { - private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = { + private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = { Review comment: I'll switch it to use `tempDir` shortly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
[ https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13132. - Resolution: Fixed > Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 > -- > > Key: KAFKA-13132 > URL: https://issues.apache.org/jira/browse/KAFKA-13132 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.0.0 > > > With the change in 3.0 to how topic IDs are assigned to logs, a bug was > inadvertently introduced. Now, topic IDs will only be assigned on the load of > the log to a partition in LISR requests. This means we will only assign topic > IDs for newly created topics/partitions, on broker startup, or potentially > when a partition is reassigned. > > In the case of upgrading from an IBP before 2.8, we may have a scenario where > we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller > is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last > broker upgrading, we will elect a new controller but its LISR request will > not result in topic IDs being assigned to logs of existing topics. They will > only be assigned in the cases mentioned above. > *Keep in mind, in this scenario, topic IDs will be still be assigned in the > controller/ZK to all new and pre-existing topics and will show up in > metadata.* This means we are not ensured the same guarantees we had in 2.8. > *It is just the LISR/partition.metadata part of the code that is affected.* > > The problem is two-fold > 1. We ignore LISR requests when the partition leader epoch has not increased > (previously we assigned the ID before this check) > 2. We only assign the topic ID when we are associating the log with the > partition in replicamanager for the first time. Though in the scenario > described above, we have logs associated with partitions that need to be > upgraded. > > We should check the if the LISR request is resulting in a topic ID addition > and add logic to logs already associated to partitions in replica manager. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters
dielhennr commented on a change in pull request #11179: URL: https://github.com/apache/kafka/pull/11179#discussion_r683879932 ## File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala ## @@ -34,21 +34,30 @@ import org.mockito.Mockito._ class RaftManagerTest { - private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = { + private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = { Review comment: I did this using `scala.reflect.io.Directory.deleteRecursively` as you were reviewing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
hachikuji merged pull request #11171: URL: https://github.com/apache/kafka/pull/11171 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0
ableegoldman commented on a change in pull request #11184: URL: https://github.com/apache/kafka/pull/11184#discussion_r683876709 ## File path: docs/streams/upgrade-guide.html ## @@ -52,6 +52,15 @@ Upgrade Guide and API Changes restart all new ({{fullDotVersion}}) application instances + +Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which bumped its footer version persisted on files. +This means that old versioned RocksDB would not be able to recognize the bytes written by that newer versioned RocksDB, +and hence it is harder to downgrade Kafka Streams with version 3.0.0 or newer to older versions in-flight. +Users need to wipe out the local RocksDB state stores written by the new versioned Kafka Streams before swapping in the +older versioned Kafka Streams bytecode, which when then restore the state stores with the old versioned footer from the Review comment: ```suggestion older versioned Kafka Streams bytecode, which would then restore the state stores with the old versioned footer from the ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683874039 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -719,6 +723,8 @@ void runOnce() { final long pollLatency = pollPhase(); +topologyMetadata.maybeWaitForNonEmptyTopology(() -> state); Review comment: Ack (although note that there's no wasted work on the restore phase since there's by definition nothing for the thread to do yet as it won't have been assigned any new tasks until it polls again). I don't think it really matters much where we put this for that reason, except for the case in which we start up with no topology -- then it's a waste to join the group in the first place, so we may as well wait until we receive something to work on. So yes, I'll move it back ahead of poll -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683872205 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; -private final SortedMap builders; // Keep sorted by topology name for readability +private final TopologyVersion version; + +private final ConcurrentNavigableMap builders; // Keep sorted by topology name for readability Review comment: > I'll revisit the best way to handle the builders map in that as well I was referring to moving the `builders` map to the `TopologyVersion` in the above, ie I want to save that for Pt. 4 if that's ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13174) Log Compaction Blocked Forever by Unstable Offset/Unclosed Transaction
Michael Jaschob created KAFKA-13174: --- Summary: Log Compaction Blocked Forever by Unstable Offset/Unclosed Transaction Key: KAFKA-13174 URL: https://issues.apache.org/jira/browse/KAFKA-13174 Project: Kafka Issue Type: Bug Components: core, log cleaner Affects Versions: 2.5.1 Reporter: Michael Jaschob Our production cluster experienced a single __consumer_offsets partition that was growing without ever being compacted. A closer inspection of the cleaner logs showed that the log was forever uncleanable at an offset from July 28, which had been written ~7 days previously: {code:java} [2021-08-02 19:08:39,650] DEBUG Finding range of cleanable offsets for log=__consumer_offsets-9. Last clean offset=Some(75795702964) now=1627956519650 => firstDirtyOffset=75795702964 firstUncleanableOffset=75868740168 activeSegment.baseOffset=76691318694 (kafka.log.LogCleanerManager$) {code} Using the log dumper tool, we were able to examine the records/batches around this offset and determined that the proximate cause was an "open" transaction that was never committed or aborted. We saw this: - a consumer group offset commit for group {{foo-group}} for topic-partition {{foo-topic-46}} from pid 1686325 (log offset 75868731509) - a transactional COMMIT marker from pid 1686325 (log offset 75868731579) - another consumer group offset commit for group {{foo-group}} for topic-partition {{foo-topic-46}} for pid 1686325 (log offset 75868740168, our first uncleanable offset) Here's the raw log dumper output: {code:java} baseOffset: 75868731509 lastOffset: 75868731509 count: 1 baseSequence: 0 lastSequence: 0 producerId: 1686325 producerEpoch: 249 partitionLeaderEpoch: 320 isTransactional: true isControl: false position: 98725764 CreateTime: 1627495733656 size: 232 magic : 2 compresscodec: NONE crc: 485368943 isvalid: true | offset: 75868731509 CreateTime: 1627495733656 keysize: 126 valuesize: 36 sequence: 0 headerKeys: [] key: offset_commit::group=foo-group,partition=foo-topic-46 payload: offset=59016695,metadata=AQAAAXruS8Fg ... baseOffset: 75868731579 lastOffset: 75868731579 count: 1 baseSequence: -1 lastSequence: -1 producerId: 1686325 producerEpoch: 249 partitionLeaderEpoch: 320 isTransactional: true isControl: true position: 98732634 CreateTime: 1627495733700 size: 78 magic : 2 compresscodec: NONE crc: 785483064 isvalid: true | offset: 75868731579 CreateTime: 1627495733700 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 143 ... baseOffset: 75868740168 lastOffset: 75868740168 count: 1 baseSequence: 0 lastSequence: 0 producerId: 1686325 producerEpoch: 249 partitionLeaderEpoch: 320 isTransactional: true isControl: false position: 99599843 CreateTime: 1627495737629 size: 232 magic: 2 compresscodec: NONE crc: 1222829008 isvalid: true | offset: 75868740168 CreateTime: 1627495737629 keysize: 126 valuesize: 36 sequence: 0 headerKeys: [] key: offset_commit::group=foo-group,partition=foo-topic-46 payload: offset=59016695,metadata=AQAAAXruS8Fg ... {code} There was no further activity from that pid 1686325. In fact, the KStream application in question stalled on that partition because of this unstable offset/open transaction: {{The following partitions still have unstable offsets which are not cleared on the broker side: [foo-topic-46], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log}} We then captured the producer snapshot file from the broker data directory and wrote a quick tool to dump it as text. From its output, we found that the transactional producer in question (pid 1686325) was still considered alive with its hanging transaction at 75868740168: {code:java} ArraySeq(ProducerStateEntry(producerId=1686325, producerEpoch=249, currentTxnFirstOffset=Some(75868740168), coordinatorEpoch=143, lastTimestamp=1627495737629, batchMetadata=Queue(BatchMetadata(firstSeq=0, lastSeq=0, firstOffset=75868740168, lastOffset=75868740168, timestamp=1627495737629))) {code} This was very perplexing. As far as we can tell, the code in both Apache Kafka 2.5.1 and in trunk essentially treats an open transaction like we had as uncleanable, which in practice blocks the log from ever being compacted again, for all eternity. Once a pid has an open transaction - a defined {{currentTxnFirstOffset}} - {{ProducerStateManager}} will [never expire the producer|https://github.com/apache/kafka/blob/2.5.1/core/src/main/scala/kafka/log/ProducerStateManager.scala#L576-L577], even after {{transactional.id.expiration.ms}} has passed. This, obviously, has severe repercussions on a topic like __consumer_offsets (long coordinator load times, always-growing disk usage). While we're not sure what led to this hanging open transaction (note: we were running partiti
[GitHub] [kafka] jsancio commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters
jsancio commented on a change in pull request #11179: URL: https://github.com/apache/kafka/pull/11179#discussion_r683859800 ## File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ## @@ -262,6 +262,8 @@ class KafkaConfigTest { props.put(KafkaConfig.ProcessRolesProp, "controller") props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092") props.put(KafkaConfig.NodeIdProp, "1") +props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, s"1@localhost:9092") Review comment: This is regular string. String interpolation `s"..."` is not needed. This comment applies to a few places. ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1949,6 +1951,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO ) } +val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty +if (voterIds.nonEmpty) { + if (processRoles.contains(ControllerRole)) { +// Ensure that controllers use their node.id as a voter in controller.quorum.voters +require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds") + } else { +// Ensure that the broker's node.id is not an id in controller.quorum.voters +require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds") + } +} else if (usesSelfManagedQuorum) { + throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${RaftConfig.QUORUM_VOTERS_CONFIG} must contain a parseable set of voters.") +} Review comment: ```scala if (usesSelfManagedQuorum) { val voterIds = RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet if (voterIds.isEmpty) { throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${RaftConfig.QUORUM_VOTERS_CONFIG} must contain a parseable set of voters.") } else if (processRoles.contains(ControllerRole)) { // Ensure that controllers use their node.id as a voter in controller.quorum.voters require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds") } else { // Ensure that the broker's node.id is not an id in controller.quorum.voters require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds") } } ``` ## File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala ## @@ -34,21 +34,30 @@ import org.mockito.Mockito._ class RaftManagerTest { - private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = { + private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = { def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = { val props = new Properties props.setProperty(KafkaConfig.ProcessRolesProp, processRoles) props.setProperty(KafkaConfig.NodeIdProp, nodeId) props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093") props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT") - props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093")) - if (processRoles.contains("broker")) + if (processRoles.contains("broker")) { props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT") props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092") +if (!processRoles.contains("controller")) { + val nodeIdMod = (nodeId.toInt + 1) + props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, s"${nodeIdMod.toString}@localhost:9093") Review comment: String interpolation should call `toString` so you don't need to call it. ## File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala ## @@ -34,21 +34,30 @@ import org.mockito.Mockito._ class RaftManagerTest { - private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = { + private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = { Review comment: The issue is that after every test you need to delete the directory create
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683851998 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon } } +/** + * Checks for added or removed NamedTopologies that correspond to any assigned tasks, and creates/freezes them if so + */ +void handleTopologyUpdates() { +tasks.maybeCreateTasksFromNewTopologies(); +for (final Task task : activeTaskIterable()) { +if (topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) { +task.freezeProcessing(); Review comment: Well long story short I was trying to avoid mucking around in the task state management (which has historically been the source of many critical bugs) -- also we want to remove the `suspended` state soon anyways. But actually it seems simpler to just `close()` the tasks here and now altogether -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0
guozhangwang commented on pull request #11184: URL: https://github.com/apache/kafka/pull/11184#issuecomment-893876355 ping @ableegoldman @cadonna for reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0
guozhangwang opened a new pull request #11184: URL: https://github.com/apache/kafka/pull/11184 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13170) Flaky Test InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown
[ https://issues.apache.org/jira/browse/KAFKA-13170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394358#comment-17394358 ] Guozhang Wang commented on KAFKA-13170: --- I think I know the reason of this flaky test. I have a PR https://github.com/apache/kafka/pull/11155 for another case within this class but it should be able to fix this one as well. [~ableegoldman] could you help me reviewing that one? > Flaky Test InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown > -- > > Key: KAFKA-13170 > URL: https://issues.apache.org/jira/browse/KAFKA-13170 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: A. Sophie Blee-Goldman >Priority: Major > > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11176/2/testReport/org.apache.kafka.streams.processor.internals/InternalTopicManagerTest/Build___JDK_8_and_Scala_2_12___shouldRetryDeleteTopicWhenTopicUnknown_2/] > {code:java} > Stacktracejava.lang.AssertionError: unexpected exception type thrown; > expected: but > was: > at org.junit.Assert.assertThrows(Assert.java:1020) > at org.junit.Assert.assertThrows(Assert.java:981) > at > org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenRetriableException(InternalTopicManagerTest.java:526) > at > org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown(InternalTopicManagerTest.java:497) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683823753 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; -private final SortedMap builders; // Keep sorted by topology name for readability +private final TopologyVersion version; + +private final ConcurrentNavigableMap builders; // Keep sorted by topology name for readability Review comment: SGTM. What about the other comment, i.e. moving the `Map builders` into the `TopologyVersion` itself? Besides the constructors, the only modifiers to `builders` seem to be `register/deregister`, in which we would always try to `getAndIncrement` version. So what about consolidating the modification of builders along with version bump, and hence we would not need to use a `ConcurrentNavigableMap`? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon } } +/** + * Checks for added or removed NamedTopologies that correspond to any assigned tasks, and creates/freezes them if so + */ +void handleTopologyUpdates() { +tasks.maybeCreateTasksFromNewTopologies(); +for (final Task task : activeTaskIterable()) { +if (topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) { +task.freezeProcessing(); Review comment: Just another thought: if this is just for the very short temporary phase between when the topology is removed to when the rebalance is finally triggered to remove the tasks (which should usually be in the next poll), could we just call `task.suspend` instead of adding the new `freeze` logic? When we finally close the tasks we can still transit from suspended to closed? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -719,6 +723,8 @@ void runOnce() { final long pollLatency = pollPhase(); +topologyMetadata.maybeWaitForNonEmptyTopology(() -> state); Review comment: How about moving this ahead of `pollPhase()`? We are likely to be kicked out of the group while blocked waiting here, so it's better to be aware of that and re-join the group immediately, rather than doing the restore/etc still which may be all wasted work. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -867,7 +873,25 @@ private void initializeAndRestorePhase() { log.debug("Idempotent restore call done. Thread state has not changed."); } +// Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology +private void checkForTopologyUpdates() { +if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) { +lastSeenTopologyVersion = topologyMetadata.topologyVersion(); +taskManager.handleTopologyUpdates(); + +log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment"); Review comment: very nit: just add a TODO that we can improve this case to not always enforce rebalance when version bumped, in case we forgot in future PRs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11183: MINOR: Increase the Kafka shutdown timeout to 120
ableegoldman commented on pull request #11183: URL: https://github.com/apache/kafka/pull/11183#issuecomment-893863088 Merged to trunk and cherrypicked to 3.0 to help stabilize system tests (cc @kkonstantine) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11183: MINOR: Increase the Kafka shutdown timeout to 120
ableegoldman merged pull request #11183: URL: https://github.com/apache/kafka/pull/11183 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11183: MINOR: Increase the Kafka shutdown timeout to 120
ableegoldman commented on pull request #11183: URL: https://github.com/apache/kafka/pull/11183#issuecomment-893861008 One unrelated flaky test failure in ` kafka.api.PlaintextConsumerTest.testListTopics()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly
[ https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394319#comment-17394319 ] Jason Gustafson commented on KAFKA-13173: - I upgraded this bug to a blocker because I think it can result in data loss. For example, in the example above, the second ISR change would be interpreted as an expansion, but there may have been committed writes to the log between the two ISR changes which were not reflected in the expansion. > KRaft controller does not handle simultaneous broker expirations correctly > -- > > Key: KAFKA-13173 > URL: https://issues.apache.org/jira/browse/KAFKA-13173 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Niket Goel >Priority: Blocker > Fix For: 3.0.0 > > > In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current > stale replicas and attempt to remove them from the ISR. However, when > multiple expirations occur at once, we do not properly accumulate the ISR > changes. For example, I ran a test where the ISR of a partition was > initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at > the same time. The records that were generated by `fenceStaleBrokers` were > the following: > {code} > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] > {code} > First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the > record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing > of broker 3 is handled. So we did not account for the fact that we had > already fenced broker 2 in the request. > A simple solution for now is to change the logic to handle fencing only one > broker at a time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly
[ https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13173: Description: In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current stale replicas and attempt to remove them from the ISR. However, when multiple expirations occur at once, we do not properly accumulate the ISR changes. For example, I ran a test where the ISR of a partition was initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at the same time. The records that were generated by `fenceStaleBrokers` were the following: {code} ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, removingReplicas=null, addingReplicas=null) at version 0), ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, removingReplicas=null, addingReplicas=null) at version 0), ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] {code} First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing of broker 3 is handled. So we did not account for the fact that we had already fenced broker 2 in the request. A simple solution for now is to change the logic to handle fencing only one broker at a time. was: In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current stale replicas and attempt to remove them from the ISR. However, when multiple expirations occur at once, we do not properly accumulate the ISR changes. For example, I ran a test where the ISR of a partition was initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at the same time. The records that were generated by `fenceStaleBrokers` were the following: {code} ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, removingReplicas=null, addingReplicas=null) at version 0), ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, removingReplicas=null, addingReplicas=null) at version 0), ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] {code} First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing of broker 3 is handled. So we did not account for the fact that we had already fenced broker 2 in the request. A simple solution for now is to change the logic to handle fencing only one broker at a time. > KRaft controller does not handle simultaneous broker expirations correctly > -- > > Key: KAFKA-13173 > URL: https://issues.apache.org/jira/browse/KAFKA-13173 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Niket Goel >Priority: Blocker > Fix For: 3.0.0 > > > In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current > stale replicas and attempt to remove them from the ISR. However, when > multiple expirations occur at once, we do not properly accumulate the ISR > changes. For example, I ran a test where the ISR of a partition was > initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at > the same time. The records that were generated by `fenceStaleBrokers` were > the following: > {code} > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] > {code} > First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the > record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing > of broker 3 is handled. So we did not account for the fact that we had > already fenced broker 2 in the request. > A simple solution for now is to change the logic to handle fencing only one > broker at a time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei merged pull request #11153: MINOR: Port fix to other StoreQueryIntegrationTests
vvcephei merged pull request #11153: URL: https://github.com/apache/kafka/pull/11153 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13172) Document in Streams 3.0 that due to rocksDB footer version in-filght downgrade is not supported
[ https://issues.apache.org/jira/browse/KAFKA-13172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394313#comment-17394313 ] Guozhang Wang commented on KAFKA-13172: --- Yes, that's correct. > Document in Streams 3.0 that due to rocksDB footer version in-filght > downgrade is not supported > --- > > Key: KAFKA-13172 > URL: https://issues.apache.org/jira/browse/KAFKA-13172 > Project: Kafka > Issue Type: Improvement >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly
[ https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Niket Goel reassigned KAFKA-13173: -- Assignee: Niket Goel > KRaft controller does not handle simultaneous broker expirations correctly > -- > > Key: KAFKA-13173 > URL: https://issues.apache.org/jira/browse/KAFKA-13173 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Niket Goel >Priority: Blocker > Fix For: 3.0.0 > > > In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current > stale replicas and attempt to remove them from the ISR. However, when > multiple expirations occur at once, we do not properly accumulate the ISR > changes. For example, I ran a test where the ISR of a partition was > initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at > the same time. The records that were generated by `fenceStaleBrokers` were > the following: > {code} > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] > {code} > First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the > record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing > of broker 3 is handled. So we did not account for the fact that we had > already fenced broker 2 in the request. > A simple solution for now is to change the logic to handle fencing only one > broker at a time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly
[ https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13173: Fix Version/s: 3.0.0 > KRaft controller does not handle simultaneous broker expirations correctly > -- > > Key: KAFKA-13173 > URL: https://issues.apache.org/jira/browse/KAFKA-13173 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 3.0.0 > > > In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current > stale replicas and attempt to remove them from the ISR. However, when > multiple expirations occur at once, we do not properly accumulate the ISR > changes. For example, I ran a test where the ISR of a partition was > initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at > the same time. The records that were generated by `fenceStaleBrokers` were > the following: > {code} > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] > {code} > First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the > record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing > of broker 3 is handled. So we did not account for the fact that we had > already fenced broker 2 in the request. > A simple solution for now is to change the logic to handle fencing only one > broker at a time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly
[ https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13173: Priority: Blocker (was: Major) > KRaft controller does not handle simultaneous broker expirations correctly > -- > > Key: KAFKA-13173 > URL: https://issues.apache.org/jira/browse/KAFKA-13173 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > > In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current > stale replicas and attempt to remove them from the ISR. However, when > multiple expirations occur at once, we do not properly accumulate the ISR > changes. For example, I ran a test where the ISR of a partition was > initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at > the same time. The records that were generated by `fenceStaleBrokers` were > the following: > {code} > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] > {code} > First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the > record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing > of broker 3 is handled. So we did not account for the fact that we had > already fenced broker 2 in the request. > A simple solution for now is to change the logic to handle fencing only one > broker at a time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang merged pull request #11151: MINOR: Should commit a task if the consumer position advanced as well
guozhangwang merged pull request #11151: URL: https://github.com/apache/kafka/pull/11151 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly
Jason Gustafson created KAFKA-13173: --- Summary: KRaft controller does not handle simultaneous broker expirations correctly Key: KAFKA-13173 URL: https://issues.apache.org/jira/browse/KAFKA-13173 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current stale replicas and attempt to remove them from the ISR. However, when multiple expirations occur at once, we do not properly accumulate the ISR changes. For example, I ran a test where the ISR of a partition was initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at the same time. The records that were generated by `fenceStaleBrokers` were the following: {code} ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, removingReplicas=null, addingReplicas=null) at version 0), ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, removingReplicas=null, addingReplicas=null) at version 0), ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] {code} First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing of broker 3 is handled. So we did not account for the fact that we had already fenced broker 2 in the request. A simple solution for now is to change the logic to handle fencing only one broker at a time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman merged pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest
ableegoldman merged pull request #11156: URL: https://github.com/apache/kafka/pull/11156 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest
ableegoldman commented on a change in pull request #11156: URL: https://github.com/apache/kafka/pull/11156#discussion_r683763243 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -142,12 +142,11 @@ private boolean allSubscriptionsEqual(Set allTopics, for (final TopicPartition tp : memberData.partitions) { // filter out any topics that no longer exist or aren't part of the current subscription if (allTopics.contains(tp.topic())) { - -if (!allPreviousPartitionsToOwner.containsKey(tp)) { -allPreviousPartitionsToOwner.put(tp, consumer); +String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer); +if (otherConsumer == null) { +// this partition is not owned by other consumer in the same generation ownedPartitions.add(tp); } else { -String otherConsumer = allPreviousPartitionsToOwner.get(tp); log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " + "same generation {}, this will be invalidated and removed from their previous assignment.", consumer, otherConsumer, tp, maxGeneration); Review comment: New logic makes sense to me 👍 -- I agree it doesn't really matter if we overwrite the previous consumer in the map since the thing we really care about is whether or not there was one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13172) Document in Streams 3.0 that due to rocksDB footer version in-filght downgrade is not supported
[ https://issues.apache.org/jira/browse/KAFKA-13172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394279#comment-17394279 ] A. Sophie Blee-Goldman commented on KAFKA-13172: Just to clarify, a rolling downgrade is still possible, you would just need to clear the local state when bouncing each node – is that correct? > Document in Streams 3.0 that due to rocksDB footer version in-filght > downgrade is not supported > --- > > Key: KAFKA-13172 > URL: https://issues.apache.org/jira/browse/KAFKA-13172 > Project: Kafka > Issue Type: Improvement >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jzaralim opened a new pull request #11183: MINOR: Increase the Kafka shutdown timeout to 120
jzaralim opened a new pull request #11183: URL: https://github.com/apache/kafka/pull/11183 The streams static membership test has failed several times due to hitting the Kafka shutdown timeout, but the logs were showing that the shutdown did actually succeed after the 60 second timeout. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13172) Document in Streams 3.0 that due to rocksDB footer version in-filght downgrade is not supported
Guozhang Wang created KAFKA-13172: - Summary: Document in Streams 3.0 that due to rocksDB footer version in-filght downgrade is not supported Key: KAFKA-13172 URL: https://issues.apache.org/jira/browse/KAFKA-13172 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Guozhang Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #11172: MINOR: update stream-stream join docs
mjsax commented on pull request #11172: URL: https://github.com/apache/kafka/pull/11172#issuecomment-893689864 Merged to `trunk` and cherry-picked to `3.0` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #11172: MINOR: update stream-stream join docs
mjsax merged pull request #11172: URL: https://github.com/apache/kafka/pull/11172 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters
dielhennr commented on a change in pull request #11179: URL: https://github.com/apache/kafka/pull/11179#discussion_r683690243 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1949,6 +1951,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO ) } +val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty +if (voterIds.nonEmpty) { + if (processRoles.contains(ControllerRole)) { +// Ensure that controllers use their node.id as a voter in controller.quorum.voters +require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds") + } else { +// Ensure that the broker's node.id is not an id in controller.quorum.voters +require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds") + } +} + Review comment: Good catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
hachikuji commented on a change in pull request #11171: URL: https://github.com/apache/kafka/pull/11171#discussion_r683680618 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -553,6 +553,17 @@ class Log(@volatile var logStartOffset: Long, /** Only used for ZK clusters when we update and start using topic IDs on existing topics */ def assignTopicId(topicId: Uuid): Unit = { +// defensively check that any newly assign topic ID matches any that is already set +_topicId.foreach { current => + if (!current.equals(topicId)) { +// we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower Review comment: nit: I think it's ok to leave this out. The point of adding the check is to reduce the coupling with ReplicaManager ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -553,6 +553,17 @@ class Log(@volatile var logStartOffset: Long, /** Only used for ZK clusters when we update and start using topic IDs on existing topics */ def assignTopicId(topicId: Uuid): Unit = { +// defensively check that any newly assign topic ID matches any that is already set +_topicId.foreach { current => + if (!current.equals(topicId)) { +// we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower +throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," + + s"but log already contained topic ID $current") + } + // Topic ID already assigned so we can return + return Review comment: I was thinking how we could avoid this return. How about something like this: ```scala _topicId match { case Some(currentId) => if (!currentId.equals(topicId)) { throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," + s"but log already contained topic ID $current") } case None => if (keepPartitionMetadataFile) { _topicId = Some(topicId) if (!partitionMetadataFile.exists()) { partitionMetadataFile.record(topicId) scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile) } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11154: KAFKA-13068: Rename Log to UnifiedLog
junrao commented on a change in pull request #11154: URL: https://github.com/apache/kafka/pull/11154#discussion_r683680502 ## File path: core/src/main/scala/kafka/log/UnifiedLog.scala ## @@ -1738,12 +1747,12 @@ object Log extends Logging { logDirFailureChannel: LogDirFailureChannel, lastShutdownClean: Boolean = true, topicId: Option[Uuid], -keepPartitionMetadataFile: Boolean): Log = { +keepPartitionMetadataFile: Boolean): UnifiedLog = { // create the log directory if it doesn't exist Files.createDirectories(dir.toPath) -val topicPartition = Log.parseTopicPartitionName(dir) +val topicPartition = UnifiedLog.parseTopicPartitionName(dir) val segments = new LogSegments(topicPartition) -val leaderEpochCache = Log.maybeCreateLeaderEpochCache( +val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( Review comment: Should we change the logging prefix to UnifiedLog in line 1760 too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12616) Convert integration tests to use ClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-12616: --- Fix Version/s: 3.1.0 > Convert integration tests to use ClusterTest > - > > Key: KAFKA-12616 > URL: https://issues.apache.org/jira/browse/KAFKA-12616 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > Labels: kip-500 > Fix For: 3.1.0 > > > We would like to convert integration tests to use the new ClusterTest > annotations so that we can easily test both the Zk and KRaft implementations. > This will require adding a bunch of support to the ClusterTest framework as > we go along. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters
mumrah commented on a change in pull request #11179: URL: https://github.com/apache/kafka/pull/11179#discussion_r68365 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1949,6 +1951,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO ) } +val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty +if (voterIds.nonEmpty) { + if (processRoles.contains(ControllerRole)) { +// Ensure that controllers use their node.id as a voter in controller.quorum.voters +require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds") + } else { +// Ensure that the broker's node.id is not an id in controller.quorum.voters +require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds") + } +} + Review comment: Right, but if the voters list _is_ empty, and we are in self-managed mode, I believe we should throw an error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13168) KRaft observers should not have a replica id
[ https://issues.apache.org/jira/browse/KAFKA-13168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-13168: - Fix Version/s: 3.1.0 > KRaft observers should not have a replica id > > > Key: KAFKA-13168 > URL: https://issues.apache.org/jira/browse/KAFKA-13168 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Ryan Dielhenn >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0, 3.1.0 > > > To avoid miss configuration of a broker affecting the quorum of the cluster > metadata partition when a Kafka node is configure as broker only the replica > id for the KRaft client should be set to {{Optional::empty()}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13168) KRaft observers should not have a replica id
[ https://issues.apache.org/jira/browse/KAFKA-13168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-13168. -- Resolution: Fixed > KRaft observers should not have a replica id > > > Key: KAFKA-13168 > URL: https://issues.apache.org/jira/browse/KAFKA-13168 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Ryan Dielhenn >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0, 3.1.0 > > > To avoid miss configuration of a broker affecting the quorum of the cluster > metadata partition when a Kafka node is configure as broker only the replica > id for the KRaft client should be set to {{Optional::empty()}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13167) KRaft broker should heartbeat immediately during controlled shutdown
[ https://issues.apache.org/jira/browse/KAFKA-13167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13167. - Fix Version/s: 3.0.0 Resolution: Fixed > KRaft broker should heartbeat immediately during controlled shutdown > > > Key: KAFKA-13167 > URL: https://issues.apache.org/jira/browse/KAFKA-13167 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0 > > > Controlled shutdown in KRaft is signaled through a heartbeat request with the > `shouldShutDown` flag set to true. When we begin controlled shutdown, we > should immediately schedule the next heartbeat instead of waiting for the > next periodic heartbeat so that we can shutdown more quickly. Otherwise > controlled shutdown can be delayed by several seconds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters
dielhennr commented on a change in pull request #11179: URL: https://github.com/apache/kafka/pull/11179#discussion_r683626287 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1949,6 +1951,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO ) } +val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty +if (voterIds.nonEmpty) { + if (processRoles.contains(ControllerRole)) { +// Ensure that controllers use their node.id as a voter in controller.quorum.voters +require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds") + } else { +// Ensure that the broker's node.id is not an id in controller.quorum.voters +require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds") + } +} + Review comment: This code already checks for that on line 1955. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #11154: KAFKA-13068: Rename Log to UnifiedLog
kowshik commented on a change in pull request #11154: URL: https://github.com/apache/kafka/pull/11154#discussion_r683616272 ## File path: core/src/main/scala/kafka/log/UnifiedLog.scala ## @@ -248,16 +250,16 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { * will be deleted to avoid ID conflicts upon re-upgrade. */ @threadsafe -class Log(@volatile var logStartOffset: Long, - private val localLog: LocalLog, - brokerTopicStats: BrokerTopicStats, - val producerIdExpirationCheckIntervalMs: Int, - @volatile var leaderEpochCache: Option[LeaderEpochFileCache], - val producerStateManager: ProducerStateManager, - @volatile private var _topicId: Option[Uuid], - val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { +class UnifiedLog(@volatile var logStartOffset: Long, + private val localLog: LocalLog, + brokerTopicStats: BrokerTopicStats, + val producerIdExpirationCheckIntervalMs: Int, + @volatile var leaderEpochCache: Option[LeaderEpochFileCache], + val producerStateManager: ProducerStateManager, + @volatile private var _topicId: Option[Uuid], + val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { - import kafka.log.Log._ + import kafka.log.UnifiedLog._ Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 edited a comment on pull request #11153: MINOR: Port fix to other StoreQueryIntegrationTests
wcarlson5 edited a comment on pull request #11153: URL: https://github.com/apache/kafka/pull/11153#issuecomment-893137087 @vvcephei looks like it is working now EDIT: nvm odd failure, I don't think that should be a failure. Trying to reproduce locally `java.lang.AssertionError: Unexpected exception thrown while getting the value from store. Expected: is one of {, } but: was "Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING" ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 edited a comment on pull request #11153: MINOR: Port fix to other StoreQueryIntegrationTests
wcarlson5 edited a comment on pull request #11153: URL: https://github.com/apache/kafka/pull/11153#issuecomment-893137087 @vvcephei looks like it is working now EDIT: nvm odd failure `java.lang.AssertionError: Unexpected exception thrown while getting the value from store. Expected: is one of {, } but: was "Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING" ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 edited a comment on pull request #11153: MINOR: Port fix to other StoreQueryIntegrationTests
wcarlson5 edited a comment on pull request #11153: URL: https://github.com/apache/kafka/pull/11153#issuecomment-893137087 @vvcephei looks like it is working now EDIT: nvm odd failure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
ccding commented on a change in pull request #0: URL: https://github.com/apache/kafka/pull/0#discussion_r683573626 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java ## @@ -253,9 +253,9 @@ public RemoteLogManagerConfig(AbstractConfig config) { config.getInt(REMOTE_LOG_READER_THREADS_PROP), config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP), config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP), - config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), + config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) == null ? "" : config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), Review comment: We will get a null pointer exception when we call `new KafkaConfig()` without setting `REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP`. E.g., https://github.com/apache/kafka/blob/d6f6edd2b17cbaccd4c87bfa66f871a676760044/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala#L1057 This happens everywhere in the codebase after this change: https://github.com/apache/kafka/pull/0/files#diff-cbe6a8b71b05ed22cf09d97591225b588e9fca6caaf95d3b34a43262cfd23aa6R1437-R1438 Since `REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP` is not a required configuration if we don't enable KIP-405, I think we should pass an empty string to RemoteLogManagerConfig if it is null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
ccding commented on a change in pull request #0: URL: https://github.com/apache/kafka/pull/0#discussion_r683573626 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java ## @@ -253,9 +253,9 @@ public RemoteLogManagerConfig(AbstractConfig config) { config.getInt(REMOTE_LOG_READER_THREADS_PROP), config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP), config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP), - config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), + config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) == null ? "" : config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), Review comment: We will get a null pointer exception when we call `new KafkaConfig()` without setting `REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP`. E.g., https://github.com/apache/kafka/blob/d6f6edd2b17cbaccd4c87bfa66f871a676760044/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala#L1057 This happens everywhere in the codebase after this change: https://github.com/apache/kafka/pull/0/files#diff-cbe6a8b71b05ed22cf09d97591225b588e9fca6caaf95d3b34a43262cfd23aa6R1437-R1438 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
ccding commented on a change in pull request #0: URL: https://github.com/apache/kafka/pull/0#discussion_r683573626 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java ## @@ -253,9 +253,9 @@ public RemoteLogManagerConfig(AbstractConfig config) { config.getInt(REMOTE_LOG_READER_THREADS_PROP), config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP), config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP), - config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), + config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) == null ? "" : config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), Review comment: We will get a null pointer exception when we call `new KafkaConfig()` without setting `REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP`. E.g., https://github.com/apache/kafka/blob/d6f6edd2b17cbaccd4c87bfa66f871a676760044/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala#L1057 This happens everywhere in the codebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
ccding commented on a change in pull request #0: URL: https://github.com/apache/kafka/pull/0#discussion_r683573626 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java ## @@ -253,9 +253,9 @@ public RemoteLogManagerConfig(AbstractConfig config) { config.getInt(REMOTE_LOG_READER_THREADS_PROP), config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP), config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP), - config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), + config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) == null ? "" : config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), Review comment: We will get a null pointer exception everywhere when we call `new KafkaConfig()` without setting `REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP`. E.g., https://github.com/apache/kafka/blob/d6f6edd2b17cbaccd4c87bfa66f871a676760044/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala#L1057 This happens everywhere in the codebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao opened a new pull request #11182: KAFKA-13074: Implement mayClean for MockLog
zhaohaidao opened a new pull request #11182: URL: https://github.com/apache/kafka/pull/11182 The current implement of MockLog doesn't implement maybeClean. It is expected that MockLog has the same semantic as KafkaMetadataLog. This is assumed to be true for a few of the tests suite like the raft simulation and the kafka raft client test context. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #11178: KAFKA-13168: KRaft observers should not have a replica id
mumrah commented on pull request #11178: URL: https://github.com/apache/kafka/pull/11178#issuecomment-893549648 cherry-picked to 3.0 as 765d2006a32d7a -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #11178: KAFKA-13168: KRaft observers should not have a replica id
mumrah merged pull request #11178: URL: https://github.com/apache/kafka/pull/11178 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters
mumrah commented on a change in pull request #11179: URL: https://github.com/apache/kafka/pull/11179#discussion_r683552057 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1949,6 +1951,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO ) } +val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty +if (voterIds.nonEmpty) { + if (processRoles.contains(ControllerRole)) { +// Ensure that controllers use their node.id as a voter in controller.quorum.voters +require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds") + } else { +// Ensure that the broker's node.id is not an id in controller.quorum.voters +require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds") + } +} + Review comment: I think we need an additional check that the voter list is not empty if `process.roles` is set. Otherwise we will skip the validation that `node.id` is included in the voter list when we are a controller. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts.
satishd commented on pull request #11058: URL: https://github.com/apache/kafka/pull/11058#issuecomment-893543468 @junrao: Please review this PR once https://github.com/apache/kafka/pull/11060 is reviewed and merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
[ https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson closed KAFKA-12935. -- > Flaky Test > RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore > > > Key: KAFKA-12935 > URL: https://issues.apache.org/jira/browse/KAFKA-12935 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Walker Carlson >Priority: Critical > Labels: flaky-test > Fix For: 3.1.0 > > > {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13171) KIP-500 Setup and named docker volumes
[ https://issues.apache.org/jira/browse/KAFKA-13171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Claudio Carcaci updated KAFKA-13171: Priority: Trivial (was: Major) > KIP-500 Setup and named docker volumes > -- > > Key: KAFKA-13171 > URL: https://issues.apache.org/jira/browse/KAFKA-13171 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Claudio Carcaci >Priority: Trivial > > Following the KIP-500 instructions to enable the Kraft mode > ([https://github.com/apache/kafka/blob/trunk/config/kraft/README.md)] the > command: > {code:java} > $ ./bin/kafka-storage.sh format -t -c ./config/kraft/server.properties > {code} > will create a "meta.properties" file in the logs folder (i.e. > /tmp/kraft-combined-logs). > If I build a Docker image with Kraft mode enabled and I mount a named volume > on the /tmp/kraft-combined-logs the content of the folder will be overwritten > (emptied) by the Docker named volume content. So I will lose the > meta.properties file. > A possible solution would be to create the meta.properties file in a > separated folder and store all the created logs elsewhere so that by mounting > a named volume the logs folder can start empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13171) KIP-500 Setup and named docker volumes
[ https://issues.apache.org/jira/browse/KAFKA-13171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Claudio Carcaci updated KAFKA-13171: Priority: Major (was: Trivial) > KIP-500 Setup and named docker volumes > -- > > Key: KAFKA-13171 > URL: https://issues.apache.org/jira/browse/KAFKA-13171 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Claudio Carcaci >Priority: Major > > Following the KIP-500 instructions to enable the Kraft mode > ([https://github.com/apache/kafka/blob/trunk/config/kraft/README.md)] the > command: > {code:java} > $ ./bin/kafka-storage.sh format -t -c ./config/kraft/server.properties > {code} > will create a "meta.properties" file in the logs folder (i.e. > /tmp/kraft-combined-logs). > If I build a Docker image with Kraft mode enabled and I mount a named volume > on the /tmp/kraft-combined-logs the content of the folder will be overwritten > (emptied) by the Docker named volume content. So I will lose the > meta.properties file. > A possible solution would be to create the meta.properties file in a > separated folder and store all the created logs elsewhere so that by mounting > a named volume the logs folder can start empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13171) KIP-500 Setup and named docker volumes
Claudio Carcaci created KAFKA-13171: --- Summary: KIP-500 Setup and named docker volumes Key: KAFKA-13171 URL: https://issues.apache.org/jira/browse/KAFKA-13171 Project: Kafka Issue Type: Improvement Affects Versions: 2.8.0 Reporter: Claudio Carcaci Following the KIP-500 instructions to enable the Kraft mode ([https://github.com/apache/kafka/blob/trunk/config/kraft/README.md)] the command: {code:java} $ ./bin/kafka-storage.sh format -t -c ./config/kraft/server.properties {code} will create a "meta.properties" file in the logs folder (i.e. /tmp/kraft-combined-logs). If I build a Docker image with Kraft mode enabled and I mount a named volume on the /tmp/kraft-combined-logs the content of the folder will be overwritten (emptied) by the Docker named volume content. So I will lose the meta.properties file. A possible solution would be to create the meta.properties file in a separated folder and store all the created logs elsewhere so that by mounting a named volume the logs folder can start empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely
mimaison commented on pull request #11174: URL: https://github.com/apache/kafka/pull/11174#issuecomment-893504255 Thanks for the PR, it looks good. Is it possible to add a test for it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #10807: KAFKA-12797: Log the evictor of fetch sessions
tombentley commented on a change in pull request #10807: URL: https://github.com/apache/kafka/pull/10807#discussion_r683481734 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -692,6 +692,7 @@ class KafkaApis(val requestChannel: RequestChannel, fetchRequest.version, fetchRequest.metadata, fetchRequest.isFromFollower, + s"clientId=${request.context.clientId}, principal=${request.context.principal}", Review comment: > It's not ideal that we have to build the string even if we don't use it. I couldn't see a nice way to do that. Basing it off the level of the `FetchSessionCache` logger seemed to break encapsulation, since at the call site we don't know how this string is going to be used. > > In practice, this extra logging is useful if there's a malicious user forcing sessions to roll or if a user is using a broken client (like Sarama 1.26.0). So I wonder if we really need the clientId. While it's nice to have, it's a user-controlled field so this could be problematic for large values. WDYT? It's a good point that the client controls the `clientId`, and could potentially choose a long one. The value of logging it here isn't that high, so I can remove it. If you're making a point about a clientId being maliciously chosen then that's a bigger problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #10277: KAFKA-9914: Fix replication cycle detection
mimaison commented on a change in pull request #10277: URL: https://github.com/apache/kafka/pull/10277#discussion_r683463480 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java ## @@ -489,7 +489,17 @@ boolean isCycle(String topic) { String source = replicationPolicy.topicSource(topic); if (source == null) { return false; -} else if (source.equals(sourceAndTarget.target())) { +} + +// Fix for https://issues.apache.org/jira/browse/KAFKA-9914 +final boolean condition; +if (replicationPolicy instanceof IdentityReplicationPolicy) { Review comment: This seems a bit brittle. Users could implement their own `ReplicationPolicy` that behaves like `IdentityReplicationPolicy` and this would not catch it. Can we detect what to do by making calls on the `replicationPolicy` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-5966) Support ByteBuffer serialization in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-5966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-5966: Assignee: Luke Chen > Support ByteBuffer serialization in Kafka Streams > - > > Key: KAFKA-5966 > URL: https://issues.apache.org/jira/browse/KAFKA-5966 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Luke Chen >Priority: Major > > Currently Kafka Streams only supports serialization using byte arrays. This > means we generate a lot of garbage and spend unnecessary time copying bytes, > especially when working with windowed state stores that rely on composite > keys. In many places in the code we have extract parts of the composite key > to deserialize the either the timestamp or the message key from the state > store key (e.g. the methods in WindowStoreUtils) > Having support for serde into/from ByteBuffers would allow us to reuse the > underlying bytearrays and just pass around slices of the underlying Buffers > to avoid the unnecessary copying. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
ijuma commented on pull request #0: URL: https://github.com/apache/kafka/pull/0#issuecomment-893407158 Thanks for explaining the motivation @ccding. In my opinion, this is a bit confusing. What makes remote log configs special when compared to local log configs? The same arguments regarding reuse, etc. could be said for those, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #10807: KAFKA-12797: Log the evictor of fetch sessions
mimaison commented on a change in pull request #10807: URL: https://github.com/apache/kafka/pull/10807#discussion_r683367337 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -692,6 +692,7 @@ class KafkaApis(val requestChannel: RequestChannel, fetchRequest.version, fetchRequest.metadata, fetchRequest.isFromFollower, + s"clientId=${request.context.clientId}, principal=${request.context.principal}", Review comment: It's not ideal that we have to build the string even if we don't use it. In practice, this extra logging is useful if there's a malicious user forcing sessions to roll or if a user is using a broken client (like Sarama 1.26.0). So I wonder if we really need the clientId. While it's nice to have, it's a user-controlled field so this could be problematic for large values. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
satishd commented on a change in pull request #0: URL: https://github.com/apache/kafka/pull/0#discussion_r683370891 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java ## @@ -253,9 +253,9 @@ public RemoteLogManagerConfig(AbstractConfig config) { config.getInt(REMOTE_LOG_READER_THREADS_PROP), config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP), config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP), - config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), + config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) == null ? "" : config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), Review comment: What is the intent for this change here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11154: KAFKA-13068: Rename Log to UnifiedLog
junrao commented on a change in pull request #11154: URL: https://github.com/apache/kafka/pull/11154#discussion_r682801773 ## File path: core/src/main/scala/kafka/log/UnifiedLog.scala ## @@ -248,16 +250,16 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { * will be deleted to avoid ID conflicts upon re-upgrade. */ @threadsafe -class Log(@volatile var logStartOffset: Long, - private val localLog: LocalLog, - brokerTopicStats: BrokerTopicStats, - val producerIdExpirationCheckIntervalMs: Int, - @volatile var leaderEpochCache: Option[LeaderEpochFileCache], - val producerStateManager: ProducerStateManager, - @volatile private var _topicId: Option[Uuid], - val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { +class UnifiedLog(@volatile var logStartOffset: Long, + private val localLog: LocalLog, + brokerTopicStats: BrokerTopicStats, + val producerIdExpirationCheckIntervalMs: Int, + @volatile var leaderEpochCache: Option[LeaderEpochFileCache], + val producerStateManager: ProducerStateManager, + @volatile private var _topicId: Option[Uuid], + val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { - import kafka.log.Log._ + import kafka.log.UnifiedLog._ Review comment: Should we rename the logging prefix to UnifiedLog too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] joel-hamill commented on pull request #11163: MINOR: doc change for minisr to clarify replicas in Kafka Config
joel-hamill commented on pull request #11163: URL: https://github.com/apache/kafka/pull/11163#issuecomment-892895687 LGTM, had one comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on pull request #10788: URL: https://github.com/apache/kafka/pull/10788#issuecomment-893124886 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft
mumrah merged pull request #11166: URL: https://github.com/apache/kafka/pull/11166 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] axrj commented on pull request #8906: KAFKA-10190: To set replication throttling configs at broker entity-default
axrj commented on pull request #8906: URL: https://github.com/apache/kafka/pull/8906#issuecomment-892663172 Hey, Is there any plan to merge this? It would be great to have this patch backported too if possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #11130: KAFKA-13138: FileConfigProvider#get should keep failure exception
cmccabe merged pull request #11130: URL: https://github.com/apache/kafka/pull/11130 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #11168: KAFKA-13160: Fix BrokerMetadataPublisher to pass the correct resource name to the config handler when processing config updates
cmccabe merged pull request #11168: URL: https://github.com/apache/kafka/pull/11168 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11150: MINOR: Fix missing word in LogLoader logged warning
ijuma merged pull request #11150: URL: https://github.com/apache/kafka/pull/11150 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
vvcephei commented on a change in pull request #10602: URL: https://github.com/apache/kafka/pull/10602#discussion_r682637061 ## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ## @@ -25,15 +25,17 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ -LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, DEV_BRANCH, DEV_VERSION, KafkaVersion +LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_BRANCH, DEV_VERSION, KafkaVersion # broker 0.10.0 is not compatible with newer Kafka Streams versions broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), \ str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), \ - str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(DEV_BRANCH)] + str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(DEV_BRANCH)] metadata_1_versions = [str(LATEST_0_10_0)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] +metadata_3_10_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), Review comment: It's probably a typo, but it didn't matter, since this was the wrong test to add new versions to, anyway. I backed out these changes and added the missing versions to streams_application_upgrade_test.py. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #11177: KAFKA-13167; KRaft broker should send heartbeat immediately after starting controlled shutdown
hachikuji merged pull request #11177: URL: https://github.com/apache/kafka/pull/11177 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #11175: MINOR: Fix getting started documentation
rondagostino commented on a change in pull request #11175: URL: https://github.com/apache/kafka/pull/11175#discussion_r682834005 ## File path: config/kraft/README.md ## @@ -14,8 +14,9 @@ Most important of all, KRaft mode is more scalable. We expect to be able to [su # Quickstart ## Warning -KRaft mode in Kafka 3.0 is provided for testing only, *NOT* for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into this mode. In fact, when Kafka 3.1 is released, -it may not be possible to upgrade your KRaft clusters from 3.0 to 3.1. There may be bugs, including serious ones. You should *assume that your data could be lost at any time* if you try the preview release of KRaft mode. +KRaft mode in Kafka 3.0 is provided for testing only, *NOT* for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into KRaft mode. +It is not possible to upgrade KRaft clusters from 2.8 to 3.0. Upgrading KRaft clusters from 3.0 to 3.1 will be supported. There may be bugs, including serious ones. Review comment: > Upgrading KRaft clusters from 3.0 to 3.1 will be supported. Is that guaranteed to be true? Would it be better to say something like "Upgrading KRaft clusters from 3.0 to 3.1 is likely to be supported, but this is not guaranteed." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11168: KAFKA-13160: Fix BrokerMetadataPublisher to pass the correct resource name to the config handler when processing config updates
cmccabe commented on a change in pull request #11168: URL: https://github.com/apache/kafka/pull/11168#discussion_r682803659 ## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala ## @@ -203,7 +203,15 @@ class BrokerMetadataPublisher(conf: KafkaConfig, } tag.foreach { t => val newProperties = newImage.configs().configProperties(configResource) - dynamicConfigHandlers(t).processConfigChanges(configResource.name(), newProperties) +val maybeDefaultName = if (conf.usesSelfManagedQuorum) { Review comment: BrokerMetadataPublisher is only used when in KRaft mode, so this check is not necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
hachikuji commented on a change in pull request #11171: URL: https://github.com/apache/kafka/pull/11171#discussion_r683019148 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long, /** Only used for ZK clusters when we update and start using topic IDs on existing topics */ def assignTopicId(topicId: Uuid): Unit = { +// defensively check that any newly assign topic ID matches any that is already set +_topicId.foreach { current => + if (!current.equals(topicId)) Review comment: Can we shortcut return if the current topicId is already defined and matches the provided topicId? ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long, /** Only used for ZK clusters when we update and start using topic IDs on existing topics */ def assignTopicId(topicId: Uuid): Unit = { +// defensively check that any newly assign topic ID matches any that is already set +_topicId.foreach { current => + if (!current.equals(topicId)) + // we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower Review comment: nit: fix alignment (just use braces 😉 . I won't tell anyone) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
kowshik commented on a change in pull request #0: URL: https://github.com/apache/kafka/pull/0#discussion_r683184053 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java ## @@ -253,9 +253,9 @@ public RemoteLogManagerConfig(AbstractConfig config) { config.getInt(REMOTE_LOG_READER_THREADS_PROP), config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP), config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP), - config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), + config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) == null ? "" : config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), Review comment: Hmm, why is this change needed? It doesn't seem like the PR is altering behavior such as these but maybe I'm missing something. ## File path: core/src/main/scala/kafka/log/LogConfig.scala ## @@ -107,46 +107,52 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] val LeaderReplicationThrottledReplicas = getList(LogConfig.LeaderReplicationThrottledReplicasProp) val FollowerReplicationThrottledReplicas = getList(LogConfig.FollowerReplicationThrottledReplicasProp) val messageDownConversionEnable = getBoolean(LogConfig.MessageDownConversionEnableProp) - val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp) - val localRetentionMs: Long = { -val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp) + class RemoteLogConfig { +val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp) -// -2 indicates to derive value from retentionMs property. -if(localLogRetentionMs == -2) retentionMs -else { - // Added validation here to check the effective value should not be more than RetentionMs. - if(localLogRetentionMs == -1 && retentionMs != -1) { -throw new ConfigException(LogConfig.LocalLogRetentionMsProp, localLogRetentionMs, s"Value must not be -1 as ${LogConfig.RetentionMsProp} value is set as $retentionMs.") - } +val localRetentionMs: Long = { + val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp) - if (localLogRetentionMs > retentionMs) { -throw new ConfigException(LogConfig.LocalLogRetentionMsProp, localLogRetentionMs, s"Value must not be more than property: ${LogConfig.RetentionMsProp} value.") - } + // -2 indicates to derive value from retentionMs property. + if(localLogRetentionMs == -2) retentionMs + else { +// Added validation here to check the effective value should not be more than RetentionMs. +if(localLogRetentionMs == -1 && retentionMs != -1) { + throw new ConfigException(LogConfig.LocalLogRetentionMsProp, localLogRetentionMs, s"Value must not be -1 as ${LogConfig.RetentionMsProp} value is set as $retentionMs.") +} + +if (localLogRetentionMs > retentionMs) { + throw new ConfigException(LogConfig.LocalLogRetentionMsProp, localLogRetentionMs, s"Value must not be more than property: ${LogConfig.RetentionMsProp} value.") +} - localLogRetentionMs +localLogRetentionMs + } } - } - val localRetentionBytes: Long = { -val localLogRetentionBytes = getLong(LogConfig.LocalLogRetentionBytesProp) +val localRetentionBytes: Long = { + val localLogRetentionBytes = getLong(LogConfig.LocalLogRetentionBytesProp) -// -2 indicates to derive value from retentionSize property. -if(localLogRetentionBytes == -2) retentionSize; -else { - // Added validation here to check the effective value should not be more than RetentionBytes. - if(localLogRetentionBytes == -1 && retentionSize != -1) { -throw new ConfigException(LogConfig.LocalLogRetentionBytesProp, localLogRetentionBytes, s"Value must not be -1 as ${LogConfig.RetentionBytesProp} value is set as $retentionSize.") - } + // -2 indicates to derive value from retentionSize property. + if(localLogRetentionBytes == -2) retentionSize; Review comment: nit: it seems like we could remove the semicolon at the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah edited a comment on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft
mumrah edited a comment on pull request #11166: URL: https://github.com/apache/kafka/pull/11166#issuecomment-892067696 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr closed pull request #11159: MINOR: Change default node id in kraft broker properties
dielhennr closed pull request #11159: URL: https://github.com/apache/kafka/pull/11159 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
ccding commented on a change in pull request #0: URL: https://github.com/apache/kafka/pull/0#discussion_r683068699 ## File path: core/src/main/scala/kafka/log/LogConfig.scala ## @@ -107,46 +107,52 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] val LeaderReplicationThrottledReplicas = getList(LogConfig.LeaderReplicationThrottledReplicasProp) val FollowerReplicationThrottledReplicas = getList(LogConfig.FollowerReplicationThrottledReplicasProp) val messageDownConversionEnable = getBoolean(LogConfig.MessageDownConversionEnableProp) - val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp) - val localRetentionMs: Long = { -val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp) + class TieredLogConfig { Review comment: Done. Thanks for the comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes
jlprat commented on pull request #10784: URL: https://github.com/apache/kafka/pull/10784#issuecomment-892521275 Pinging @vvcephei Would you be able to merger this before more commits get in the way and we need another rebase? Thanks a ton! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft
mumrah commented on pull request #11166: URL: https://github.com/apache/kafka/pull/11166#issuecomment-892891735 Spoke with @vvcephei about the streams smoke test and it is a known flaky. We are good-to-go with enabling KRaft for this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
junrao commented on a change in pull request #0: URL: https://github.com/apache/kafka/pull/0#discussion_r682820398 ## File path: core/src/main/scala/kafka/log/LogConfig.scala ## @@ -107,46 +107,52 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] val LeaderReplicationThrottledReplicas = getList(LogConfig.LeaderReplicationThrottledReplicasProp) val FollowerReplicationThrottledReplicas = getList(LogConfig.FollowerReplicationThrottledReplicasProp) val messageDownConversionEnable = getBoolean(LogConfig.MessageDownConversionEnableProp) - val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp) - val localRetentionMs: Long = { -val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp) + class TieredLogConfig { Review comment: This probably should be named RemoteLogConfig to match RemoteLogManagerConfig? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
jolshan commented on pull request #11171: URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org