[GitHub] [kafka] ableegoldman commented on pull request #11124: [WIP] KAFKA-12839: use sessionWindow to represent SlidingWindows
ableegoldman commented on pull request #11124: URL: https://github.com/apache/kafka/pull/11124#issuecomment-887994503 It seems super awkward and likely to lead to confusion/future mistakes to use something called a `"SessionWindow"` in the "SlidingWindowAggregate", although yeah, it's pretty much exactly the same otherwise and can be reused. In other words, I totally agree with your proposal to just rename the existing Window implementations to describe the actual interval they represent, rather than some specific type of windowed operation that just happens to use them at the moment. (In fact I had written that first paragraph before I even saw your comment with the renaming proposal, great minds think alike huh 😜) That said, those names are just super clunky. Imagine trying to code something up with that...just takes too much mental processing. Maybe it's my inner physicist, but sometimes mathematical precision just isn't appropriate for real-world usage (don't tell any mathematicians I said that!) Unfortunately I'm not crazy about any of the alternatives I can think up, maybe you can come up with some better ideas. Here's the best I could come up with: `TimeWindow` --> `InclusiveExclusiveWindow` `SessionWindow` / `SlidingWindow` --> `InclusiveInclusiveWindow` `UnlimitedWindow` --> `InclusiveUnboundedWindow` To me at least these feel more natural, ie it's clear what they mean without having to reference Wikipedia. I mean most people probably do know what open/closed mean, but inclusive/exclusive is more to the point. Also I think we can drop left/right and just imply it by the ordering. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11124: [WIP] KAFKA-12839: use sessionWindow to represent SlidingWindows
showuon commented on pull request #11124: URL: https://github.com/apache/kafka/pull/11124#issuecomment-887989639 @mjsax @ableegoldman , I found we already had a `SessionWindow` to represent the close time interval: `[start,end]`. So I directly use it for `SlidingWindows` aggregation window creation. I think this is what we want, right? I have another thought, which is to rename the time interval related windows. Currently, we have 3 types of time interval window: `TimeWindow` -> to have `[start,end)` time interval `SessionWindow` -> to have `[start,end]` time interval `UnlimitedWindow` -> to have `[start, MAX_VALUE)` time interval I think the name `SessionWindow` is definitely not good here, especially we want to use it in `SlidingWindows` now, although it is only used for `SessionWindows` before. We should name them with time interval meaning, not the streaming window functions meaning. ex: `TimeWindow` -> `LeftClosedRightOpenWindow` `SessionWindow` -> `ClosedTimeWindow` `UnlimitedWindow` -> `LeftClosedWindow` ref: the `Classification of intervals` section in https://en.wikipeadia.org/wiki/Interval_(mathematics) Because these 3 window types are internal use only, it is safe to rename them. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error
showuon commented on pull request #11086: URL: https://github.com/apache/kafka/pull/11086#issuecomment-887980048 @dajac , could you help take a look at this PR? Also, do we want to put this PR into v3.0? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
jolshan commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r677941027 ## File path: core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala ## @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api.{ApiVersion, KAFKA_2_8_IV0, KAFKA_3_0_IV1} +import kafka.network.SocketServer +import kafka.utils.TestUtils +import kafka.zk.ZkVersion +import org.apache.kafka.common.Uuid +import org.apache.kafka.common.message.MetadataRequestData +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import scala.collection.{Map, Seq} + +class MetadataRequestBetweenDifferentIbpTest extends BaseRequestTest { + + override def brokerCount: Int = 3 + override def generateConfigs: Seq[KafkaConfig] = { +Seq( + createConfig(0, KAFKA_2_8_IV0), + createConfig(1, KAFKA_3_0_IV1), Review comment: I think this was discussed briefly before, but is there a reason KAFKA_3_0_IV1 was chosen? Should we just use the most recent IBP? (meaning, not even specify in the properties -- just pick up the default?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
jolshan commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r677933885 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java ## @@ -32,28 +34,87 @@ */ @InterfaceStability.Evolving public class DescribeTopicsResult { -private final Map> futures; +private final Map> topicIdFutures; +private final Map> nameFutures; -protected DescribeTopicsResult(Map> futures) { -this.futures = futures; +protected DescribeTopicsResult(Map> topicIdFutures, Map> nameFutures) { +if (topicIdFutures != null && nameFutures != null) +throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be specified."); +if (topicIdFutures == null && nameFutures == null) +throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be null."); +this.topicIdFutures = topicIdFutures; +this.nameFutures = nameFutures; +} + +static DescribeTopicsResult ofTopicIds(Map> topicIdFutures) { +return new DescribeTopicsResult(topicIdFutures, null); +} + +static DescribeTopicsResult ofTopicNames(Map> nameFutures) { +return new DescribeTopicsResult(null, nameFutures); +} + +/** + * Use when {@link Admin#describeTopics(TopicCollection, DescribeTopicsOptions)} used a TopicIdCollection + * + * @return a map from topic IDs to futures which can be used to check the status of + * individual topics if the request used topic IDs, otherwise return null. + */ +public Map> topicIdValues() { +return topicIdFutures; +} + +/** + * Use when {@link Admin#describeTopics(TopicCollection, DescribeTopicsOptions)} used a TopicNameCollection + * + * @return a map from topic names to futures which can be used to check the status of + * individual topics if the request used topic names, otherwise return null. + */ +public Map> topicNameValues() { +return nameFutures; } /** * Return a map from topic names to futures which can be used to check the status of * individual topics. */ +@Deprecated public Map> values() { -return futures; +return nameFutures; } /** * Return a future which succeeds only if all the topic descriptions succeed. */ +@Deprecated public KafkaFuture> all() { -return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). +return all(nameFutures); +} + +/** + * Return a future which succeeds only if all the topic descriptions succeed. + */ +public KafkaFuture> allTopicNames() { +return all(nameFutures); +} + +/** + * Return a future which succeeds only if all the topic descriptions succeed. + */ +public KafkaFuture> allTopicIds() { +return all(topicIdFutures); +} + +/** + * Return a future which succeeds only if all the topic descriptions succeed. + */ +private static KafkaFuture> all(Map> futures) { + Review comment: nit: extra newline here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
jolshan commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r677933885 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java ## @@ -32,28 +34,87 @@ */ @InterfaceStability.Evolving public class DescribeTopicsResult { -private final Map> futures; +private final Map> topicIdFutures; +private final Map> nameFutures; -protected DescribeTopicsResult(Map> futures) { -this.futures = futures; +protected DescribeTopicsResult(Map> topicIdFutures, Map> nameFutures) { +if (topicIdFutures != null && nameFutures != null) +throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be specified."); +if (topicIdFutures == null && nameFutures == null) +throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be null."); +this.topicIdFutures = topicIdFutures; +this.nameFutures = nameFutures; +} + +static DescribeTopicsResult ofTopicIds(Map> topicIdFutures) { +return new DescribeTopicsResult(topicIdFutures, null); +} + +static DescribeTopicsResult ofTopicNames(Map> nameFutures) { +return new DescribeTopicsResult(null, nameFutures); +} + +/** + * Use when {@link Admin#describeTopics(TopicCollection, DescribeTopicsOptions)} used a TopicIdCollection + * + * @return a map from topic IDs to futures which can be used to check the status of + * individual topics if the request used topic IDs, otherwise return null. + */ +public Map> topicIdValues() { +return topicIdFutures; +} + +/** + * Use when {@link Admin#describeTopics(TopicCollection, DescribeTopicsOptions)} used a TopicNameCollection + * + * @return a map from topic names to futures which can be used to check the status of + * individual topics if the request used topic names, otherwise return null. + */ +public Map> topicNameValues() { +return nameFutures; } /** * Return a map from topic names to futures which can be used to check the status of * individual topics. */ +@Deprecated public Map> values() { -return futures; +return nameFutures; } /** * Return a future which succeeds only if all the topic descriptions succeed. */ +@Deprecated public KafkaFuture> all() { -return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). +return all(nameFutures); +} + +/** + * Return a future which succeeds only if all the topic descriptions succeed. + */ +public KafkaFuture> allTopicNames() { +return all(nameFutures); +} + +/** + * Return a future which succeeds only if all the topic descriptions succeed. + */ +public KafkaFuture> allTopicIds() { +return all(topicIdFutures); +} + +/** + * Return a future which succeeds only if all the topic descriptions succeed. + */ +private static KafkaFuture> all(Map> futures) { + Review comment: nit: extra space here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
jolshan commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r677932954 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -303,7 +303,33 @@ default DescribeTopicsResult describeTopics(Collection topicNames) { * @param optionsThe options to use when describing the topic. * @return The DescribeTopicsResult. */ -DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options); +default DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options) { +return describeTopics(TopicCollection.ofTopicNames(topicNames), options); +} + +/** + * This is a convenience method for {@link #describeTopics(TopicCollection, DescribeTopicsOptions)} + * with default options. See the overload for more details. + * + * When using topic IDs, this operation is supported by brokers with version 3.1.0 or higher. + * + * @param topics The topics to describe. + * @return The DescribeTopicsResult. + */ +default DescribeTopicsResult describeTopics(TopicCollection topics) { +return describeTopics(topics, new DescribeTopicsOptions()); +} + +/** + * describe a batch of topics. Review comment: Can we adjust this javadoc to be `Describe some topics in the cluster.` like the previous API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
jolshan commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r677932361 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -42,6 +34,14 @@ import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.requests.LeaveGroupResponse; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; Review comment: nit: is there a reason we moved these imports? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
jolshan commented on pull request #9769: URL: https://github.com/apache/kafka/pull/9769#issuecomment-887966371 @dengziming -- @rajinisivaram can help in about 2 weeks. I will try to help review so it will be ready when she gets to it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
dengziming commented on pull request #9769: URL: https://github.com/apache/kafka/pull/9769#issuecomment-887949936 @rajinisivaram Are you interested in taking this across finish line? I think this has something that is very close to a first version, we can try to land that and then improvise since this has been worked for a long time, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
hachikuji merged pull request #11098: URL: https://github.com/apache/kafka/pull/11098 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #11116: KAFKA-13114: Revert state and reregister raft listener
hachikuji commented on pull request #6: URL: https://github.com/apache/kafka/pull/6#issuecomment-887935935 I'm trying to think of some approach for validating this logic. It is difficult because it is handling unexpected exceptions. One thought I had is implementing a poison message of some kind which could expire after some TTL. When the controller sees the poison message, it would check if it is still active and raise an exception accordingly. Something like that could be used in an integration test, which might be simpler than trying to induce a failure by mucking with internal state. Another idea is to corrupt the log on one of the nodes, but I'm not sure this would hit the right path. In fact, this is probably a gap at the moment. If the batch reader fails during iteration, we should probably resign and perhaps even fail. I'll file a separate JIRA for this. In any case, I think we should try to come up with some way to exercise this path. Otherwise it's hard to say if it even works (though it looks reasonable enough). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller
[ https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-13142: --- Fix Version/s: 3.0.0 > KRaft brokers do not validate dynamic configs before forwarding them to > controller > -- > > Key: KAFKA-13142 > URL: https://issues.apache.org/jira/browse/KAFKA-13142 > Project: Kafka > Issue Type: Task > Components: kraft >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > The KRaft brokers are not currently validating dynamic configs before > forwarding them to the controller. To ensure that KRaft clusters are easily > upgradable it would be a good idea to validate dynamic configs in the first > release of KRaft so that invalid dynamic configs are never stored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
[ https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388378#comment-17388378 ] Matthias J. Sax commented on KAFKA-9897: Works for me. > Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores > - > > Key: KAFKA-9897 > URL: https://issues.apache.org/jira/browse/KAFKA-9897 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.6.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/] > {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get > state store source-table because the stream thread is PARTITIONS_ASSIGNED, > not RUNNING at > org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85) > at > org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61) > at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #11131: KAFKA-13137: KRaft Controller Metric MBean names incorrectly quoted
hachikuji commented on pull request #11131: URL: https://github.com/apache/kafka/pull/11131#issuecomment-887924795 @rondagostino I've seen this test failing consistently: `kafka.controller.ControllerEventManagerTest.testMetricsCleanedOnClose()`. I tried it locally and it passes when run by itself. I have a suspicion that now that the jmx names have been fixed, there is kind of registration conflict. Perhaps the new tests or some other test which involves the KRaft controller is leaving behind the mbeans. As we noted above, the de-registration logic has not been implemented for these metrics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel commented on pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers
niket-goel commented on pull request #11135: URL: https://github.com/apache/kafka/pull/11135#issuecomment-887915977 @jsancio Without the change (as per Jason's description in the JIRA https://issues.apache.org/jira/browse/KAFKA-13143), the API would return an empty list of topics making the user think that they had no topics. With the change the describe and list topic APIs timeout on the controller endpoint (same as create_topic). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller
[ https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13142: -- Priority: Blocker (was: Major) > KRaft brokers do not validate dynamic configs before forwarding them to > controller > -- > > Key: KAFKA-13142 > URL: https://issues.apache.org/jira/browse/KAFKA-13142 > Project: Kafka > Issue Type: Task > Components: kraft >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > > The KRaft brokers are not currently validating dynamic configs before > forwarding them to the controller. To ensure that KRaft clusters are easily > upgradable it would be a good idea to validate dynamic configs in the first > release of KRaft so that invalid dynamic configs are never stored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller
[ https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13142: -- Priority: Major (was: Blocker) > KRaft brokers do not validate dynamic configs before forwarding them to > controller > -- > > Key: KAFKA-13142 > URL: https://issues.apache.org/jira/browse/KAFKA-13142 > Project: Kafka > Issue Type: Task > Components: kraft >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Major > > The KRaft brokers are not currently validating dynamic configs before > forwarding them to the controller. To ensure that KRaft clusters are easily > upgradable it would be a good idea to validate dynamic configs in the first > release of KRaft so that invalid dynamic configs are never stored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10548) Implement deletion logic for LeaderAndIsrRequests
[ https://issues.apache.org/jira/browse/KAFKA-10548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388367#comment-17388367 ] Lucas Wang commented on KAFKA-10548: Hi [~jolshan] thanks for the KIP-516 and for keeping track of the tasks. At LinkedIn, we are very interested in adopting the better topic deletions. Have you started/planned to work on this ticket? If not, do you mind if I take it over? > Implement deletion logic for LeaderAndIsrRequests > - > > Key: KAFKA-10548 > URL: https://issues.apache.org/jira/browse/KAFKA-10548 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Priority: Major > > This will allow for specialized deletion logic when receiving > LeaderAndIsrRequests > Will also create and utilize delete.stale.topic.delay.ms configuration option -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller
[ https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13142: -- Description: The KRaft brokers are not currently validating dynamic configs before forwarding them to the controller. To ensure that KRaft clusters are easily upgradable it would be a good idea to validate dynamic configs in the first release of KRaft so that invalid dynamic configs are never stored. (was: The KRaft controller is not currently validating dynamic configs. To ensure that KRaft clusters are easily upgradable it would be a good idea to validate dynamic configs in the first release of KRaft so that invalid dynamic configs are never stored.) > KRaft brokers do not validate dynamic configs before forwarding them to > controller > -- > > Key: KAFKA-13142 > URL: https://issues.apache.org/jira/browse/KAFKA-13142 > Project: Kafka > Issue Type: Task > Components: kraft >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > > The KRaft brokers are not currently validating dynamic configs before > forwarding them to the controller. To ensure that KRaft clusters are easily > upgradable it would be a good idea to validate dynamic configs in the first > release of KRaft so that invalid dynamic configs are never stored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller
[ https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13142: -- Summary: KRaft brokers do not validate dynamic configs before forwarding them to controller (was: KRaft controller does not validate dynamic configs) > KRaft brokers do not validate dynamic configs before forwarding them to > controller > -- > > Key: KAFKA-13142 > URL: https://issues.apache.org/jira/browse/KAFKA-13142 > Project: Kafka > Issue Type: Task > Components: kraft >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > > The KRaft controller is not currently validating dynamic configs. To ensure > that KRaft clusters are easily upgradable it would be a good idea to validate > dynamic configs in the first release of KRaft so that invalid dynamic configs > are never stored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] niket-goel commented on a change in pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers
niket-goel commented on a change in pull request #11135: URL: https://github.com/apache/kafka/pull/11135#discussion_r677871792 ## File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala ## @@ -145,6 +145,14 @@ class ControllerApisTest { authorizer } + @Test + def testHanldleMetadata(): Unit = { +val caught = assertThrows(classOf[ApiException], () => createControllerApis( + None, new MockController.Builder().build()). Review comment: Was chasing a phantom. This was not an issue. Incorrect implementation on my part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11134: KAFKA-12851: Fix Raft partition simulation
jsancio commented on a change in pull request #11134: URL: https://github.com/apache/kafka/pull/11134#discussion_r677868867 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -171,6 +171,12 @@ private boolean updateHighWatermark() { || (highWatermarkUpdateOffset == currentHighWatermarkMetadata.offset && !highWatermarkUpdateMetadata.metadata.equals(currentHighWatermarkMetadata.metadata))) { highWatermark = highWatermarkUpdateOpt; +log.debug( Review comment: Yes. Updated the PR to use trace. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
guozhangwang commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r677787551 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -859,27 +855,27 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, ClientMetrics.addVersionMetric(streamsMetrics); ClientMetrics.addCommitIdMetric(streamsMetrics); ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG)); -ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, internalTopologyBuilder.describe().toString()); +ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, this.topologyMetadata.topologyDescriptionString()); ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state); ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> getNumLiveStreamThreads()); streamsMetadataState = new StreamsMetadataState( -internalTopologyBuilder, +this.topologyMetadata, Review comment: nit: seems misaligned. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,304 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the "__" (double underscore) string is not allowed for topology names, so it's safe to use to indicate +// that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability + +private ProcessorTopology globalTopology; +private Map globalStateStores = new HashMap<>(); +final Set allInputTopics = new HashSet<>(); + +public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +this.config = config; +builders = new TreeMap<>(); +if (builder.hasNamedTopology()) { +builders.put(builder.topologyName(), builder); +} else { +builders.put(UNNAMED_TOPOLOGY, builder); +} +} + +public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +this.config = config; +this.builders = builders; +if (builders.isEmpty()) { +log.debug("Building KafkaStreams app with no empty topology"); +} +} + +public int getNumStreamThreads(final StreamsConfig config) { +final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + +// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later +if (builders.isEmpty()) { +if (configuredNumStreamThreads != 0) { +log.info("Overriding number of StreamThreads to zero for empty topology"); +} +return 0; +} + +// If there are named topologies but some are empty, this indicates a bug in user code +if (hasNamedTopologies()) { +if (hasNoNonGlobalTopology() && !hasGlobalTopology()) { Review comment: I kept getting myself confused by the `NoNonGlobal` haha (got me for the first time reviewing this, and then again for the third pass) :P As a hindsight maybe we should invent the term "local topology" at the first place. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java ## @@ -93,24 +95,25 @@ public void init(final ProcessorCont
[GitHub] [kafka] jolshan commented on a change in pull request #11126: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
jolshan commented on a change in pull request #11126: URL: https://github.com/apache/kafka/pull/11126#discussion_r677854946 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1367,12 +1367,25 @@ class ReplicaManager(val config: KafkaConfig, val currentLeaderEpoch = partition.getLeaderEpoch val requestLeaderEpoch = partitionState.leaderEpoch val requestTopicId = topicIdFromRequest(topicPartition.topic) + val logTopicId = partition.topicId - if (!hasConsistentTopicId(requestTopicId, partition.topicId)) { -stateChangeLogger.error(s"Topic ID in memory: ${partition.topicId.get} does not" + + // When running a ZK controller and upgrading to topic IDs we may receive a request with leader epoch + // that is equal to the current leader epoch. In this case, we want to assign topic ID to the log. + def isUpgradingToTopicIdWithExistingLog: Boolean = { +requestLeaderEpoch == currentLeaderEpoch && Review comment: Ah yeah. That would work. I felt it was a bit odd too, but didn't think of this alternative. This placement is better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11126: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
hachikuji commented on a change in pull request #11126: URL: https://github.com/apache/kafka/pull/11126#discussion_r677853531 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1367,12 +1367,25 @@ class ReplicaManager(val config: KafkaConfig, val currentLeaderEpoch = partition.getLeaderEpoch val requestLeaderEpoch = partitionState.leaderEpoch val requestTopicId = topicIdFromRequest(topicPartition.topic) + val logTopicId = partition.topicId - if (!hasConsistentTopicId(requestTopicId, partition.topicId)) { -stateChangeLogger.error(s"Topic ID in memory: ${partition.topicId.get} does not" + + // When running a ZK controller and upgrading to topic IDs we may receive a request with leader epoch + // that is equal to the current leader epoch. In this case, we want to assign topic ID to the log. + def isUpgradingToTopicIdWithExistingLog: Boolean = { +requestLeaderEpoch == currentLeaderEpoch && Review comment: Might be just me, but I find it a little awkward to see the epoch check nested here given the other checks below. Would it be reasonable instead to move this to after the inequality checks? ```scala else if (requestLeaderEpoch < currentLeaderEpoch) { ... } else { val error = requestTopicId match { case Some(topicId) if logTopicId.isEmpty => // The controller may send LeaderAndIsr to update topicId without bumping the epoch log.assignTopicId(topicId) stateChangeLogger.info("Updating topicId for $log to $topicId from LeaderAndIsr request") Errors.NONE case _ => stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition since its associated " + s"leader epoch $requestLeaderEpoch matches the current leader epoch") Errors.STALE_CONTROLLER_EPOCH } responseMap.put(topicPartition, error) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
hachikuji commented on a change in pull request #11098: URL: https://github.com/apache/kafka/pull/11098#discussion_r677847584 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -161,22 +162,26 @@ class TransactionStateManager(brokerId: Int, maxBatchSize ) - partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) => + var breakIteration = false Review comment: I tried to consolidate this logic into a single loop, which I think is what you are suggesting. Let me know if the latest commit is what you had in mind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13139) Empty response after requesting to restart a connector without the tasks results in NPE
[ https://issues.apache.org/jira/browse/KAFKA-13139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388332#comment-17388332 ] Kalpesh Patel commented on KAFKA-13139: --- Thanks a lot [~rhauch] and [~kkonstantine] for taking care of the fix and unblocking the release. > Empty response after requesting to restart a connector without the tasks > results in NPE > --- > > Key: KAFKA-13139 > URL: https://issues.apache.org/jira/browse/KAFKA-13139 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Blocker > Fix For: 3.0.0 > > > After https://issues.apache.org/jira/browse/KAFKA-4793 a response to restart > only the connector (without any tasks) returns OK with an empty body. > As system test runs revealed, this causes an NPE in > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L135] > We should return 204 (NO_CONTENT) instead. > This is a regression from previous behavior, therefore the ticket is marked > as a blocker candidate for 3.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency
[ https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388324#comment-17388324 ] Kai Huang commented on KAFKA-12713: --- Twitter would be interested to pursue this work to debug latency issues. [~mingaliu], I wonder do you mind if I take over this JIRA ticket and [KIP-736|https://cwiki.apache.org/confluence/display/KAFKA/KIP-736%3A+Report+the+true+end+to+end+fetch+latency]? > Report "REAL" follower/consumer fetch latency > - > > Key: KAFKA-12713 > URL: https://issues.apache.org/jira/browse/KAFKA-12713 > Project: Kafka > Issue Type: Bug >Reporter: Ming Liu >Priority: Major > > The fetch latency is an important metrics to monitor for the cluster > performance. With ACK=ALL, the produce latency is affected primarily by > broker fetch latency. > However, currently the reported fetch latency didn't reflect the true fetch > latency because it sometimes need to stay in purgatory and wait for > replica.fetch.wait.max.ms when data is not available. This greatly affect the > real P50, P99 etc. > I like to propose a KIP to be able track the real fetch latency for both > broker follower and consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13141) Leader should not update follower fetch offset if diverging epoch is present
[ https://issues.apache.org/jira/browse/KAFKA-13141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13141: --- Assignee: Rajini Sivaram (was: Jason Gustafson) > Leader should not update follower fetch offset if diverging epoch is present > > > Key: KAFKA-13141 > URL: https://issues.apache.org/jira/browse/KAFKA-13141 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0, 2.7.1 >Reporter: Jason Gustafson >Assignee: Rajini Sivaram >Priority: Blocker > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > In 2.7, we began doing fetcher truncation piggybacked on the Fetch protocol > instead of using the old OffsetsForLeaderEpoch API. When truncation is > detected, we return a `divergingEpoch` field in the Fetch response, but we do > not set an error code. The sender is expected to check if the diverging epoch > is present and truncate accordingly. > All of this works correctly in the fetcher implementation, but the problem is > that the logic to update the follower fetch position on the leader does not > take into account the diverging epoch present in the response. This means the > fetch offsets can be updated incorrectly, which can lead to either log > divergence or the loss of committed data. > For example, we hit the following case with 3 replicas. Leader 1 is elected > in epoch 1 with an end offset of 100. The followers are at offset 101 > Broker 1: (Leader) Epoch 1 from offset 100 > Broker 2: (Follower) Epoch 1 from offset 101 > Broker 3: (Follower) Epoch 1 from offset 101 > Broker 1 receives fetches from 2 and 3 at offset 101. The leader detects the > divergence and returns a diverging epoch in the fetch state. Nevertheless, > the fetch positions for both followers are updated to 101 and the high > watermark is advanced. > After brokers 2 and 3 had truncated to offset 100, broker 1 experienced a > network partition of some kind and was kicked from the ISR. This caused > broker 2 to get elected, which resulted in the following state at the start > of epoch 2. > Broker 1: (Follower) Epoch 2 from offset 101 > Broker 2: (Leader) Epoch 2 from offset 100 > Broker 3: (Follower) Epoch 2 from offset 100 > Broker 2 was then able to write a new entry at offset 100 and the old record > which may have been exposed to consumers was deleted by broker 1. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] niket-goel commented on a change in pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers
niket-goel commented on a change in pull request #11135: URL: https://github.com/apache/kafka/pull/11135#discussion_r677821728 ## File path: clients/src/main/resources/common/message/MetadataRequest.json ## @@ -16,9 +16,9 @@ { "apiKey": 3, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["zkBroker", "broker"], "name": "MetadataRequest", - "validVersions": "0-11", + "validVersions": "0-12", Review comment: Makes sense. Will remove the version bump. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11134: KAFKA-12851: Fix Raft partition simulation
hachikuji commented on a change in pull request #11134: URL: https://github.com/apache/kafka/pull/11134#discussion_r677821521 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -171,6 +171,12 @@ private boolean updateHighWatermark() { || (highWatermarkUpdateOffset == currentHighWatermarkMetadata.offset && !highWatermarkUpdateMetadata.metadata.equals(currentHighWatermarkMetadata.metadata))) { highWatermark = highWatermarkUpdateOpt; +log.debug( Review comment: Do you think this level of detail is more suitable for trace? We have debug logging for high watermark advances in `KafkaRaftClient`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel commented on a change in pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers
niket-goel commented on a change in pull request #11135: URL: https://github.com/apache/kafka/pull/11135#discussion_r677820374 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -151,6 +150,11 @@ class ControllerApis(val requestChannel: RequestChannel, handleRaftRequest(request, response => new FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData])) } + // This API is not wired in yet as the controller does not implement the + // metadata completely. + // Leaving the code in to be completed and wired in the future + // + // See https://issues.apache.org/jira/browse/KAFKA-13143 for details Review comment: Will remove the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers
hachikuji commented on a change in pull request #11135: URL: https://github.com/apache/kafka/pull/11135#discussion_r677819183 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -151,6 +150,11 @@ class ControllerApis(val requestChannel: RequestChannel, handleRaftRequest(request, response => new FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData])) } + // This API is not wired in yet as the controller does not implement the + // metadata completely. + // Leaving the code in to be completed and wired in the future + // + // See https://issues.apache.org/jira/browse/KAFKA-13143 for details Review comment: My preference would be to remove the handler completely. We can always bring it back in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers
hachikuji commented on a change in pull request #11135: URL: https://github.com/apache/kafka/pull/11135#discussion_r677818654 ## File path: clients/src/main/resources/common/message/MetadataRequest.json ## @@ -16,9 +16,9 @@ { "apiKey": 3, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["zkBroker", "broker"], "name": "MetadataRequest", - "validVersions": "0-11", + "validVersions": "0-12", Review comment: We shouldn't need to bump the version here if all we're changing is `listeners`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel commented on a change in pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers
niket-goel commented on a change in pull request #11135: URL: https://github.com/apache/kafka/pull/11135#discussion_r677815515 ## File path: clients/src/main/resources/common/message/MetadataRequest.json ## @@ -16,9 +16,9 @@ { "apiKey": 3, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["zkBroker", "broker"], "name": "MetadataRequest", - "validVersions": "0-11", + "validVersions": "0-12", Review comment: I am not sure if just modifying the valid versions bumps the version number (or if we want to bump the version at all) ## File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala ## @@ -145,6 +145,14 @@ class ControllerApisTest { authorizer } + @Test + def testHanldleMetadata(): Unit = { +val caught = assertThrows(classOf[ApiException], () => createControllerApis( + None, new MockController.Builder().build()). Review comment: I think I might have found an NPE. createControllerApis allows for Option[authorizer], however fails with an NPE if None is passed as an authorizer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel opened a new pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers
niket-goel opened a new pull request #11135: URL: https://github.com/apache/kafka/pull/11135 This PR is WIP and the test added does not work yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator
[ https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388304#comment-17388304 ] A. Sophie Blee-Goldman commented on KAFKA-8295: --- I was just re-reading the wiki page on the Merge Operator, and now I wonder if it may not be _as_ helpful as I originally thought – but probably still can offer some improvement. Here's my take, let me know what you think. Regardless of whether a custom MergeOperator suffers from the same performance impact of crossing the jni, I would bet that use cases such as list-append would still be more performant (since reading out an entire list, appending to it, and then writing the entire thing back is a lot of I/O). There are also the built-in, native MergeOperators that wouldn't need to cross the jni such as the UInt64AddOperator as you point out. So there are definitely cases where a MergeOperator would still outperform a RDW sequence. The thing I didn't fully appreciate before (but seems kind of obvious now that I think of it lol) is that the merge() call doesn't actually return the current value, either before or after the merge. So if we have to know this value in addition to updating it, we need to do a get(), and using merge() instead of RMW is only saving us the cost of `put(full_merged_value) - put(single_update_value)` – which means for constant-size values, like the unint64 unfortunately, there's pretty much no savings at all. So we don't even need to worry about whether/how to handle the fact that this is now a ValueAndTimestamp instead of a plain Value, ie a Long in the case of count(), because I don't think there's likely to be any performance improvement there. I didn't realize that at the time of filing this ticket, so maybe we should look past the current title of this ticket. This still leaves some cases that could potentially benefit from even a custom MergeOperator, such as list-append or any other where the difference in size between the full_merged_value and the single_update_value is very large. So it could be worth doing a POC of something like this and benchmarking that for a KIP. But tbh, having seen how messy it is to add new operators to the StateStore interface at the moment, I think we should probably try to avoid doing so unless there's good motivation and a clear benefit. In this case, while there may be a benefit, I'm not sure there's a good motivation to do so since no user has requested this feature yet. Of course that could just be because they aren't aware of the possibility, so how about this: we update the title of this ticket to describe this possible new feature and then see if any users chime in here or vote on the ticket. If we gauge real user interest then it makes more sense to put time into doing this. WDYT? > Optimize count() using RocksDB merge operator > - > > Key: KAFKA-8295 > URL: https://issues.apache.org/jira/browse/KAFKA-8295 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Major > > In addition to regular put/get/delete RocksDB provides a fourth operation, > merge. This essentially provides an optimized read/update/write path in a > single operation. One of the built-in (C++) merge operators exposed over the > Java API is a counter. We should be able to leverage this for a more > efficient implementation of count() > > (Note: Unfortunately it seems unlikely we can use this to optimize general > aggregations, even if RocksJava allowed for a custom merge operator, unless > we provide a way for the user to specify and connect a C++ implemented > aggregator – otherwise we incur too much cost crossing the jni for a net > performance benefit) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13139) Empty response after requesting to restart a connector without the tasks results in NPE
[ https://issues.apache.org/jira/browse/KAFKA-13139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-13139. Resolution: Fixed > Empty response after requesting to restart a connector without the tasks > results in NPE > --- > > Key: KAFKA-13139 > URL: https://issues.apache.org/jira/browse/KAFKA-13139 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Blocker > Fix For: 3.0.0 > > > After https://issues.apache.org/jira/browse/KAFKA-4793 a response to restart > only the connector (without any tasks) returns OK with an empty body. > As system test runs revealed, this causes an NPE in > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L135] > We should return 204 (NO_CONTENT) instead. > This is a regression from previous behavior, therefore the ticket is marked > as a blocker candidate for 3.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine merged pull request #11132: KAFKA-13139: Empty response after requesting to restart a connector without the tasks results in NPE
kkonstantine merged pull request #11132: URL: https://github.com/apache/kafka/pull/11132 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
guozhangwang commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r677786276 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -345,8 +343,17 @@ private SinkNodeFactory(final String name, } } +public void setTopologyName(final String namedTopology) { Review comment: I see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13143) Disable Metadata endpoint for KRaft controller
[ https://issues.apache.org/jira/browse/KAFKA-13143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13143: --- Assignee: Niket Goel (was: Jose Armando Garcia Sancio) > Disable Metadata endpoint for KRaft controller > -- > > Key: KAFKA-13143 > URL: https://issues.apache.org/jira/browse/KAFKA-13143 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Niket Goel >Priority: Blocker > Fix For: 3.0.0 > > > The controller currently implements Metadata incompletely. Specifically, it > does not return the metadata for any topics in the cluster. This may tend to > cause confusion to users. For example, if someone used the controller > endpoint by mistake in `kafka-topics.sh --list`, then they would see no > topics in the cluster, which would be surprising. It would be better for 3.0 > to disable Metadata on the controller since we currently expect clients to > connect through brokers anyway. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13143) Disable Metadata endpoint for KRaft controller
[ https://issues.apache.org/jira/browse/KAFKA-13143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13143: Issue Type: Bug (was: Improvement) > Disable Metadata endpoint for KRaft controller > -- > > Key: KAFKA-13143 > URL: https://issues.apache.org/jira/browse/KAFKA-13143 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Fix For: 3.0.0 > > > The controller currently implements Metadata incompletely. Specifically, it > does not return the metadata for any topics in the cluster. This may tend to > cause confusion to users. For example, if someone used the controller > endpoint by mistake in `kafka-topics.sh --list`, then they would see no > topics in the cluster, which would be surprising. It would be better for 3.0 > to disable Metadata on the controller since we currently expect clients to > connect through brokers anyway. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
guozhangwang commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r677785678 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -1065,14 +1086,22 @@ private void buildProcessorNode(final Map> pro return Collections.unmodifiableMap(globalStateStores); } -public Set allStateStoreName() { +public Set allStateStoreNames() { Objects.requireNonNull(applicationId, "topology has not completed optimization"); final Set allNames = new HashSet<>(stateFactories.keySet()); allNames.addAll(globalStateStores.keySet()); return Collections.unmodifiableSet(allNames); } +public boolean hasStore(final String name) { +return stateFactories.containsKey(name) || globalStateStores.containsKey(name); +} + +public boolean hasPersistentStores() { Review comment: I'm not against this idea, just wondering what's the rationale behind it :) I'm happy with what you said. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
guozhangwang commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r677784899 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -345,8 +343,17 @@ private SinkNodeFactory(final String name, } } +public void setTopologyName(final String namedTopology) { +Objects.requireNonNull(namedTopology, "named topology can't be null"); +if (this.namedTopology != null) { +log.error("Tried to reset the namedTopology to {} but it was already set to {}", namedTopology, this.namedTopology); +throw new IllegalStateException("NamedTopology has already been set to " + this.namedTopology); +} +this.namedTopology = namedTopology; +} + // public for testing only -public synchronized final InternalTopologyBuilder setApplicationId(final String applicationId) { Review comment: Ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
guozhangwang commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r677784769 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,294 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability + +private ProcessorTopology globalTopology; +private Map globalStateStores = new HashMap<>(); +final Set allInputTopics = new HashSet<>(); + +public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +this.config = config; +builders = new TreeMap<>(); +if (builder.hasNamedTopology()) { +builders.put(builder.namedTopology(), builder); +} else { +builders.put(UNNAMED_TOPOLOGY, builder); +} +} + +public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +this.config = config; +this.builders = builders; +if (builders.isEmpty()) { +log.debug("Building KafkaStreams app with no empty topology"); +} +} + +public int getNumStreamThreads(final StreamsConfig config) { +final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + +// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later +if (builders.isEmpty()) { +if (configuredNumStreamThreads != 0) { +log.info("Overriding number of StreamThreads to zero for empty topology"); +} +return 0; +} + +// If there are topologies but they are all empty, this indicates a bug in user code +if (hasNoNonGlobalTopology() && !hasGlobalTopology()) { Review comment: Yeah what I meant is that, if we can ever reach this condition then it seems the extra check is redundant. I will check the current logic again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #11132: KAFKA-13139: Empty response after requesting to restart a connector without the tasks results in NPE
kkonstantine commented on pull request #11132: URL: https://github.com/apache/kafka/pull/11132#issuecomment-887820306 A single, non-relevant, test failed on the second build: `Build / JDK 8 and Scala 2.12 / kafka.server.DelegationTokenRequestsTest.testDelegationTokenRequests()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #11132: KAFKA-13139: Empty response after requesting to restart a connector without the tasks results in NPE
kkonstantine commented on pull request #11132: URL: https://github.com/apache/kafka/pull/11132#issuecomment-887819548 Thank you both. Merging to trunk and 3.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13143) Disable Metadata endpoint for KRaft controller
Jason Gustafson created KAFKA-13143: --- Summary: Disable Metadata endpoint for KRaft controller Key: KAFKA-13143 URL: https://issues.apache.org/jira/browse/KAFKA-13143 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jose Armando Garcia Sancio Fix For: 3.0.0 The controller currently implements Metadata incompletely. Specifically, it does not return the metadata for any topics in the cluster. This may tend to cause confusion to users. For example, if someone used the controller endpoint by mistake in `kafka-topics.sh --list`, then they would see no topics in the cluster, which would be surprising. It would be better for 3.0 to disable Metadata on the controller since we currently expect clients to connect through brokers anyway. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13142) KRaft controller does not validate dynamic configs
Ryan Dielhenn created KAFKA-13142: - Summary: KRaft controller does not validate dynamic configs Key: KAFKA-13142 URL: https://issues.apache.org/jira/browse/KAFKA-13142 Project: Kafka Issue Type: Task Components: kraft Affects Versions: 3.0.0 Reporter: Ryan Dielhenn Assignee: Ryan Dielhenn The KRaft controller is not currently validating dynamic configs. To ensure that KRaft clusters are easily upgradable it would be a good idea to validate dynamic configs in the first release of KRaft so that invalid dynamic configs are never stored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13141) Leader should not update follower fetch offset if diverging epoch is present
Jason Gustafson created KAFKA-13141: --- Summary: Leader should not update follower fetch offset if diverging epoch is present Key: KAFKA-13141 URL: https://issues.apache.org/jira/browse/KAFKA-13141 Project: Kafka Issue Type: Bug Affects Versions: 2.7.1, 2.8.0 Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 3.0.0, 2.7.2, 2.8.1 In 2.7, we began doing fetcher truncation piggybacked on the Fetch protocol instead of using the old OffsetsForLeaderEpoch API. When truncation is detected, we return a `divergingEpoch` field in the Fetch response, but we do not set an error code. The sender is expected to check if the diverging epoch is present and truncate accordingly. All of this works correctly in the fetcher implementation, but the problem is that the logic to update the follower fetch position on the leader does not take into account the diverging epoch present in the response. This means the fetch offsets can be updated incorrectly, which can lead to either log divergence or the loss of committed data. For example, we hit the following case with 3 replicas. Leader 1 is elected in epoch 1 with an end offset of 100. The followers are at offset 101 Broker 1: (Leader) Epoch 1 from offset 100 Broker 2: (Follower) Epoch 1 from offset 101 Broker 3: (Follower) Epoch 1 from offset 101 Broker 1 receives fetches from 2 and 3 at offset 101. The leader detects the divergence and returns a diverging epoch in the fetch state. Nevertheless, the fetch positions for both followers are updated to 101 and the high watermark is advanced. After brokers 2 and 3 had truncated to offset 100, broker 1 experienced a network partition of some kind and was kicked from the ISR. This caused broker 2 to get elected, which resulted in the following state at the start of epoch 2. Broker 1: (Follower) Epoch 2 from offset 101 Broker 2: (Leader) Epoch 2 from offset 100 Broker 3: (Follower) Epoch 2 from offset 100 Broker 2 was then able to write a new entry at offset 100 and the old record which may have been exposed to consumers was deleted by broker 1. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8295) Optimize count() using RocksDB merge operator
[ https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388235#comment-17388235 ] Sagar Rao edited comment on KAFKA-8295 at 7/27/21, 6:15 PM: [~ableegoldman].. wanted to know, if we can have a KIP for merge() operator in the KV store irrespective of using merge operator from Rocksdb or not(option 1 from above) ? I will still be doing some benchmarks on it.. That allows users to use the store for counter specific use cases. WDYT? was (Author: sagarrao): [~ableegoldman].. wanted to know, if we can have a KIP for merge() operator in the KV store irrespective of using merge operator from Rocksdb or not(option 1 from above) ? That allows users to use the store for counter specific use cases. WDYT? > Optimize count() using RocksDB merge operator > - > > Key: KAFKA-8295 > URL: https://issues.apache.org/jira/browse/KAFKA-8295 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Major > > In addition to regular put/get/delete RocksDB provides a fourth operation, > merge. This essentially provides an optimized read/update/write path in a > single operation. One of the built-in (C++) merge operators exposed over the > Java API is a counter. We should be able to leverage this for a more > efficient implementation of count() > > (Note: Unfortunately it seems unlikely we can use this to optimize general > aggregations, even if RocksJava allowed for a custom merge operator, unless > we provide a way for the user to specify and connect a C++ implemented > aggregator – otherwise we incur too much cost crossing the jni for a net > performance benefit) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator
[ https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388235#comment-17388235 ] Sagar Rao commented on KAFKA-8295: -- [~ableegoldman].. wanted to know, if we can have a KIP for merge() operator in the KV store irrespective of using merge operator from Rocksdb or not(option 1 from above) ? That allows users to use the store for counter specific use cases. WDYT? > Optimize count() using RocksDB merge operator > - > > Key: KAFKA-8295 > URL: https://issues.apache.org/jira/browse/KAFKA-8295 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Major > > In addition to regular put/get/delete RocksDB provides a fourth operation, > merge. This essentially provides an optimized read/update/write path in a > single operation. One of the built-in (C++) merge operators exposed over the > Java API is a counter. We should be able to leverage this for a more > efficient implementation of count() > > (Note: Unfortunately it seems unlikely we can use this to optimize general > aggregations, even if RocksJava allowed for a custom merge operator, unless > we provide a way for the user to specify and connect a C++ implemented > aggregator – otherwise we incur too much cost crossing the jni for a net > performance benefit) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13095) TransactionsTest is failing in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-13095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13095: --- Assignee: Jason Gustafson (was: David Arthur) > TransactionsTest is failing in kraft mode > - > > Key: KAFKA-13095 > URL: https://issues.apache.org/jira/browse/KAFKA-13095 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Colin McCabe >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
guozhangwang commented on a change in pull request #11098: URL: https://github.com/apache/kafka/pull/11098#discussion_r677680568 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -161,22 +162,26 @@ class TransactionStateManager(brokerId: Int, maxBatchSize ) - partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) => + var breakIteration = false Review comment: In that case, I feel it may actually get cleaner to inline `collectExpiredTransactionalIds` into the caller, and hence to get just one while loop / flag, we can still distinguish the case where the log is offline and hence we should not proceed v.s. the batch is full, we should write once and proceed. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #11076: KAFKA-12486: Enforce Rebalance when a TaskCorruptedException is throw…
vamossagar12 commented on pull request #11076: URL: https://github.com/apache/kafka/pull/11076#issuecomment-887710210 @ableegoldman .. sorry to bother you again on this one.. but could you plz review whenever you get the chance.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 commented on pull request #10798: URL: https://github.com/apache/kafka/pull/10798#issuecomment-887703246 hey @guozhangwang / @cadonna .. sorry for being nosey here but did you get a chance to look at these numbers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
hachikuji commented on a change in pull request #11098: URL: https://github.com/apache/kafka/pull/11098#discussion_r677652114 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -161,22 +162,26 @@ class TransactionStateManager(brokerId: Int, maxBatchSize ) - partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) => + var breakIteration = false Review comment: We've changed the logic to add check the log configuration to get to the batch size. If the partition is offline, then `ReplicaManager.getLogConfig` will return `None`. I have a test case which shows this path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
guozhangwang commented on a change in pull request #11098: URL: https://github.com/apache/kafka/pull/11098#discussion_r677649333 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -161,22 +162,26 @@ class TransactionStateManager(brokerId: Int, maxBatchSize ) - partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) => + var breakIteration = false Review comment: I think I may miss something here, could you explain why we'd need both the caller/callee doing this while loop with the break iteration flag? It seems to me that we have multiple reasons to break early: 1) we've reached the record limit, 2) the partition is already offline, and we want to treat them differently and hence the both while loops, right? But I cannot see exactly where we may have case 2)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener
jsancio commented on a change in pull request #6: URL: https://github.com/apache/kafka/pull/6#discussion_r677635360 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -783,6 +798,11 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) { }); } else if (curClaimEpoch != -1) { appendControlEvent("handleRenounce[" + curClaimEpoch + "]", () -> { +if (this != metaLogListener) { Review comment: Good idea. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request #11134: KAFKA-12851: Fix Raft partition simulation
jsancio opened a new pull request #11134: URL: https://github.com/apache/kafka/pull/11134 Instead of waiting for a high-watermark of 20 after the partition, the test should wait for the high-watermark to reach an offset greater than the largest log end offset at the time of the partition. Only that offset is guarantee to be reached as the high-watermark by the new majority. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11133: KAFKA-13140: KRaft brokers do not expose kafka.controller metrics
jsancio commented on a change in pull request #11133: URL: https://github.com/apache/kafka/pull/11133#discussion_r677612215 ## File path: core/src/main/scala/kafka/server/KafkaRaftServer.scala ## @@ -100,6 +101,9 @@ class KafkaRaftServer( controllerQuorumVotersFuture )) } else { +// we need to register the various kafka.controller metrics +// for backwards compatibility with the ZooKeeper-based case +new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry()) Review comment: This works because the constructor of `QuorumControllerMetrics` registers those metrics against the Yammer registry. How about having a static method in `QuorumControllerMetrics` that does this and doesn't use guages since they are not needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12793) Client-side Circuit Breaker for Partition Write Errors
[ https://issues.apache.org/jira/browse/KAFKA-12793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] KahnCheny reassigned KAFKA-12793: - Assignee: KahnCheny > Client-side Circuit Breaker for Partition Write Errors > -- > > Key: KAFKA-12793 > URL: https://issues.apache.org/jira/browse/KAFKA-12793 > Project: Kafka > Issue Type: New Feature > Components: clients >Reporter: KahnCheny >Assignee: KahnCheny >Priority: Major > Attachments: KAFKA-12793.patch > > > When Kafka is used to build data pipeline in mission critical business > scenarios, availability and throughput are the most important operational > goals that need to be maintained in presence of transient or permanent local > failure. One typical situation that requires Ops intervention is disk > failure, some partitions have long write latency caused by extremely high > disk utilization; since all partitions share the same buffer under the > current producer thread model, the buffer will be filled up quickly and > eventually the good partitions are impacted as well. The cluster level > success rate and timeout ratio will degrade until the local infrastructure > issue is resolved. > One way to mitigate this issue is to add client side mechanism to short > circuit problematic partitions during transient failure. Similar approach is > applied in other distributed systems and RPC frameworks. > We propose to add a configuration driven circuit breaking mechanism that > allows Kafka client to ‘mute’ partitions when certain condition is met. The > mechanism adds callbacks in Sender class workflow that allows to filtering > partitions based on certain policy. > The client can choose proper implementation that fits a special failure > scenario, Client-side custom implementation of Partitioner and > ProducerInterceptor > * Customize the implementation of ProducerInterceptor, and choose the > strategy to mute partitions. > * Customize the implementation of Partitioner, and choose the strategy to > filtering partitions. > Muting partitions have impact when the topic contains keyed message as > messages will be written to more than one partitions during period of > recovery. We believe this can be an explicit trade-off the application makes > between availability and message ordering. > KIP-693: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors|https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jlprat commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes
jlprat commented on pull request #10784: URL: https://github.com/apache/kafka/pull/10784#issuecomment-887638986 Something went wrong during the build, can someone re-trigger the build in Jenkins? Failure was: > [2021-07-27T14:23:53.802Z] FAILURE: Build failed with an exception. [2021-07-27T14:23:53.802Z] [2021-07-27T14:23:53.802Z] * What went wrong: [2021-07-27T14:23:53.802Z] Execution failed for task ':storage:unitTest'. [2021-07-27T14:23:53.802Z] > Process 'Gradle Test Executor 68' finished with non-zero exit value 1 [2021-07-27T14:23:53.802Z] This problem might be caused by incorrect test process configuration. [2021-07-27T14:23:53.802Z] Please refer to the test execution section in the User Manual at https://docs.gradle.org/7.1.1/userguide/java_testing.html#sec:test_execution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue edited a comment on pull request #10980: KAFKA-12989: MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse
kirktrue edited a comment on pull request #10980: URL: https://github.com/apache/kafka/pull/10980#issuecomment-887635572 > Why is `NetworkClient#send` throwing an exception? It shouldn't be doing that, right? Can you explain more about the problem that this PR fixes? Per [the original issue](https://issues.apache.org/jira/browse/KAFKA-12989) the `MockClient` used for testing allows for fault injection via the `RequestMatcher`. If the test sets up the condition where the request _doesn't_ match some condition, the `MockClient.send` method is supposed to throw an `IllegalStateException`. That change seemed straightforward except that this now caused problems in `KafkaAdminClient`. Because it's not expecting any errors, this exception causes the thread in `KafkaAdminClient.sendEligibleCalls` that is servicing requests to die, hence my addition of the `try`/`catch` wrapper. That said, I'm not 100% confident that this change is the right way to handle things. Please advise. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #10980: KAFKA-12989: MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse
kirktrue commented on pull request #10980: URL: https://github.com/apache/kafka/pull/10980#issuecomment-887635572 > Why is `NetworkClient#send` throwing an exception? It shouldn't be doing that, right? Can you explain more about the problem that this PR fixes? Per [the original issue](https://issues.apache.org/jira/browse/KAFKA-12989) the `MockClient` used for testing allows for fault injection via the `RequestMatcher`. If the test sets up the condition where the request _doesn't_ match some condition, the `MockClient.send` method throws an `IllegalStateException`. Because it's not expecting any errors, this exception causes the thread in `KafkaAdminClient` that is servicing requests to die, hence the `try`/`catch` wrapper. That said, I'm not 100% confident that this change is the right way to handle things. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #10951: KAFKA-12841: NPE from the provided metadata in client callback in case of ApiException
kirktrue commented on pull request #10951: URL: https://github.com/apache/kafka/pull/10951#issuecomment-887627439 Thanks @cmccabe for looking at this! > If I understand correctly, the PR makes up a fake partition so that `TopicPartition` can be non-null in the callback. I don't think that this is the right thing to do. Yeah, I'm not crazy about it either. There was some "prior art" in the codebase where the same thing was done (`ProducerInterceptors.onSendError`) and the existence of `RecordMetadata.UNKNOWN_PARTITION` suggested that it might be permissible for these kinds of cases. > Why not change the JavaDoc for the callback to indicate that `TopicPartition` can be null if the method fails before it gets assigned? My interpretation was that was akin to changing the interface of the `Callback` API and could cause some `NullPointerException` cases in users' code. I'm happy to make the change to the JavaDoc, as suggested. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10310) Kafka Raft Snapshot
[ https://issues.apache.org/jira/browse/KAFKA-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio reassigned KAFKA-10310: -- Assignee: Jose Armando Garcia Sancio (was: loboxu) > Kafka Raft Snapshot > --- > > Key: KAFKA-10310 > URL: https://issues.apache.org/jira/browse/KAFKA-10310 > Project: Kafka > Issue Type: New Feature >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > > Tracking issue for [KIP-630: Kafka Raft > Snapshot|https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] douglasawh commented on pull request #8656: KAFKA-9981; dedicated mm2 cluster lose the update operation.
douglasawh commented on pull request #8656: URL: https://github.com/apache/kafka/pull/8656#issuecomment-887607101 Any thoughts on when this might get merged? I see that checks have failed, so I'm guessing the fix is not ideal, but I haven't looked at the code in depth. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #11133: KAFKA-13140: KRaft brokers do not expose kafka.controller metrics
rondagostino opened a new pull request #11133: URL: https://github.com/apache/kafka/pull/11133 Several controller metrics are exposed on every broker in a ZooKeeper-based (i.e. non-KRaft) cluster regardless of whether the broker is the active controller or not, but these metrics are not exposed on KRaft nodes that have process.roles=broker (i.e. KRaft nodes that do not implement the controller role). For backwards compatibility, KRaft nodes that are just brokers should expose these metrics with values all equal to 0: just like ZooKeeper-based brokers do when they are not the active controller. This patch adds these metrics and an associated test case. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13140) KRaft brokers do not expose kafka.controller metrics, breaking backwards compatibility
Ron Dagostino created KAFKA-13140: - Summary: KRaft brokers do not expose kafka.controller metrics, breaking backwards compatibility Key: KAFKA-13140 URL: https://issues.apache.org/jira/browse/KAFKA-13140 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 2.8.0, 3.0.0 Reporter: Ron Dagostino Assignee: Ron Dagostino Fix For: 3.1.0 The following controller metrics are exposed on every broker in a ZooKeeper-based (i.e. non-KRaft) cluster regardless of whether the broker is the active controller or not, but these metrics are not exposed on KRaft nodes that have process.roles=broker (i.e. KRaft nodes that do not implement the controller role). For backwards compatibility, KRaft nodes that are just brokers should expose these metrics with values all equal to 0: just like ZooKeeper-based brokers do when they are not the active controller. kafka.controller:type=KafkaController,name=ActiveControllerCount kafka.controller:type=KafkaController,name=GlobalTopicCount kafka.controller:type=KafkaController,name=GlobalPartitionCount kafka.controller:type=KafkaController,name=OfflinePartitionsCount kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on a change in pull request #11132: KAFKA-13139: Empty response after requesting to restart a connector without the tasks results in NPE
rhauch commented on a change in pull request #11132: URL: https://github.com/apache/kafka/pull/11132#discussion_r677530176 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ## @@ -269,7 +269,7 @@ public Response restartConnector(final @PathParam("connector") String connector, FutureCallback cb = new FutureCallback<>(); herder.restartConnector(connector, cb); completeOrForwardRequest(cb, forwardingPath, "POST", headers, null, forward); -return Response.ok().build(); +return Response.noContent().build(); Review comment: I've corrected the KIP and sent an email describing this minor correction to the vote thread for the KIP. I've also added [a comment on KAFKA-13139](https://issues.apache.org/jira/browse/KAFKA-13139?focusedCommentId=17388116&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17388116) that describes the root cause and the KIP correction. Thanks, @kkonstantine and @kpatelatwork. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13139) Empty response after requesting to restart a connector without the tasks results in NPE
[ https://issues.apache.org/jira/browse/KAFKA-13139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388116#comment-17388116 ] Randall Hauch commented on KAFKA-13139: --- Just to clarify what appears to have happened. As [~kpatelatwork] [mentions in a comment on the PR|https://github.com/apache/kafka/pull/11132], the behavior of the Connect restart API in AK 2.8 and earlier was always to return "204 NO CONTENT", not "200 OK" as mentioned in [KIP-745|https://cwiki.apache.org/confluence/display/KAFKA/KIP-745%3A+Connect+API+to+restart+connector+and+tasks]. Although the code used `Response.ok().build()`, the `RestClient` always processed the absence of a response body as `204 NO CONTENT`. So, to maintain the actual AK 2.x behavior in this branch of the code, we should instead return `204 NO CONTENT`. I've corrected the KIP to reflect this older actual behavior of returning "204 NO CONTENT". It was a minor but necessary correction. Note that we have *not* changed the KIP or the behavior of returning "202 ACCEPTED" when `includeTasks=true` and/or `failedOnly=true`. These cases correspond to the new behavior added in KIP-745. > Empty response after requesting to restart a connector without the tasks > results in NPE > --- > > Key: KAFKA-13139 > URL: https://issues.apache.org/jira/browse/KAFKA-13139 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Blocker > Fix For: 3.0.0 > > > After https://issues.apache.org/jira/browse/KAFKA-4793 a response to restart > only the connector (without any tasks) returns OK with an empty body. > As system test runs revealed, this causes an NPE in > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L135] > We should return 204 (NO_CONTENT) instead. > This is a regression from previous behavior, therefore the ticket is marked > as a blocker candidate for 3.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jlprat commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes
jlprat commented on pull request #10784: URL: https://github.com/apache/kafka/pull/10784#issuecomment-887529961 As new code has been added I need to re-run scalafmt for those. Pushing those changes in a second. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #11131: KAFKA-13137: KRaft Controller Metric MBean names incorrectly quoted
rondagostino commented on pull request #11131: URL: https://github.com/apache/kafka/pull/11131#issuecomment-887525250 Thanks for the reviews @hachikuji and @showuon. The test is revamped so that the method is `private static void assertExpectedMetrics(Set expectedMetricNames, String expectedType)` as suggested, and all of the assertion messages now make sense if there is a failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat edited a comment on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes
jlprat edited a comment on pull request #10784: URL: https://github.com/apache/kafka/pull/10784#issuecomment-887524211 Thanks for the review @vvcephei ! Actually, the spotless task executes the scalafmt in this gradle config, this means running `./gradlew spotlessScalaCheck` will analyze all Scala files and complain about violations in the code. Then running `./gradlew :spotlessScalaApply` will fix any formatting discrepancies. Alternatively, one can configure IntelliJ or VS Code to run scalafmt and it will pick the existing configuration in the kafka repo. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes
jlprat commented on pull request #10784: URL: https://github.com/apache/kafka/pull/10784#issuecomment-887524211 Thanks for the review @vvcephei ! Actually, the spotless task executes the scalafmt in this gradle config, this means running `./gradlew spotlessScalaCheck` will analyze all Scala files and complain about violations in the code. Then running `./gradlew :spotlessScalaApply` will fix any formatting discrepancies. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes
vvcephei commented on pull request #10784: URL: https://github.com/apache/kafka/pull/10784#issuecomment-887513532 It looks like there's a new conflict. Hopefully, we can merge soon after you fix the conflict this time. By the way, can you let me know the command you used to apply the format? I've been accustomed to using Spotless Scala in this repo; I didn't know about Scala fmt until just now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10310) Kafka Raft Snapshot
[ https://issues.apache.org/jira/browse/KAFKA-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loboxu reassigned KAFKA-10310: -- Assignee: loboxu (was: Jose Armando Garcia Sancio) > Kafka Raft Snapshot > --- > > Key: KAFKA-10310 > URL: https://issues.apache.org/jira/browse/KAFKA-10310 > Project: Kafka > Issue Type: New Feature >Reporter: Jose Armando Garcia Sancio >Assignee: loboxu >Priority: Major > > Tracking issue for [KIP-630: Kafka Raft > Snapshot|https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries
patrickstuedi commented on a change in pull request #11120: URL: https://github.com/apache/kafka/pull/11120#discussion_r677296530 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java ## @@ -63,6 +65,9 @@ final KeyValue next = super.makeNext(); if (next == null) { return allDone(); +} else if (rawLastKey == null) { +return next; Review comment: Added a comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries
patrickstuedi commented on a change in pull request #11120: URL: https://github.com/apache/kafka/pull/11120#discussion_r677292872 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java ## @@ -44,17 +44,19 @@ this.forward = forward; this.toInclusive = toInclusive; if (forward) { -iter.seek(from.get()); -rawLastKey = to.get(); -if (rawLastKey == null) { -throw new NullPointerException("RocksDBRangeIterator: RawLastKey is null for key " + to); +if (from == null) { +iter.seekToFirst(); +} else { +iter.seek(from.get()); } +rawLastKey = to == null ? null : to.get(); Review comment: Thanks for the comment @showuon , fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries
patrickstuedi commented on a change in pull request #11120: URL: https://github.com/apache/kafka/pull/11120#discussion_r677285658 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java ## @@ -63,17 +65,20 @@ final KeyValue next = super.makeNext(); if (next == null) { return allDone(); +} else if (rawLastKey == null) { +return next; + } else { if (forward) { -if (comparator.compare(next.key.get(), rawLastKey) < 0) { +if (rawLastKey != null && comparator.compare(next.key.get(), rawLastKey) < 0) { Review comment: Good catch. Yes this check can be eliminated now that there is a common null check in the beginning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries
patrickstuedi commented on a change in pull request #11120: URL: https://github.com/apache/kafka/pull/11120#discussion_r677285658 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java ## @@ -63,17 +65,20 @@ final KeyValue next = super.makeNext(); if (next == null) { return allDone(); +} else if (rawLastKey == null) { +return next; + } else { if (forward) { -if (comparator.compare(next.key.get(), rawLastKey) < 0) { +if (rawLastKey != null && comparator.compare(next.key.get(), rawLastKey) < 0) { Review comment: Good catch. Yes this check can be eliminated now that there is a common null check in the beginnin. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13120) Flesh out `streams_static_membership_test` to be more robust
[ https://issues.apache.org/jira/browse/KAFKA-13120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] KahnCheny reassigned KAFKA-13120: - Assignee: Reggie Hsu (was: KahnCheny) > Flesh out `streams_static_membership_test` to be more robust > > > Key: KAFKA-13120 > URL: https://issues.apache.org/jira/browse/KAFKA-13120 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Leah Thomas >Assignee: Reggie Hsu >Priority: Minor > Labels: newbie++ > > When fixing the `streams_static_membership_test.py` we noticed that the test > is pretty bare bones, it creates a streams application but doesn't do much > with the streams application, eg has no stateful processing. We should flesh > this out a bit to be more realistic and potentially consider testing with EOS > as well. The full java test is in `StaticMembershipTestClient` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13120) Flesh out `streams_static_membership_test` to be more robust
[ https://issues.apache.org/jira/browse/KAFKA-13120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] KahnCheny reassigned KAFKA-13120: - Assignee: KahnCheny (was: Reggie Hsu) > Flesh out `streams_static_membership_test` to be more robust > > > Key: KAFKA-13120 > URL: https://issues.apache.org/jira/browse/KAFKA-13120 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Leah Thomas >Assignee: KahnCheny >Priority: Minor > Labels: newbie++ > > When fixing the `streams_static_membership_test.py` we noticed that the test > is pretty bare bones, it creates a streams application but doesn't do much > with the streams application, eg has no stateful processing. We should flesh > this out a bit to be more realistic and potentially consider testing with EOS > as well. The full java test is in `StaticMembershipTestClient` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #11120: Add support for infinite endpoints for range queries
showuon commented on a change in pull request #11120: URL: https://github.com/apache/kafka/pull/11120#discussion_r677209624 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java ## @@ -63,6 +65,9 @@ final KeyValue next = super.makeNext(); if (next == null) { return allDone(); +} else if (rawLastKey == null) { +return next; Review comment: We changed the meaning of `rawLastKey`, need to add some comment for `rawLastKey == null` case. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java ## @@ -44,17 +44,19 @@ this.forward = forward; this.toInclusive = toInclusive; if (forward) { -iter.seek(from.get()); -rawLastKey = to.get(); -if (rawLastKey == null) { -throw new NullPointerException("RocksDBRangeIterator: RawLastKey is null for key " + to); +if (from == null) { +iter.seekToFirst(); +} else { +iter.seek(from.get()); } +rawLastKey = to == null ? null : to.get(); Review comment: Should we still need to have `rawLastKey` null check for the `to.get()` case? Same comments to the other similar places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org