[GitHub] [kafka] showuon commented on a change in pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config
showuon commented on a change in pull request #10811: URL: https://github.com/apache/kafka/pull/10811#discussion_r661098484 ## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ## @@ -68,7 +68,7 @@ object ConfigCommand extends Config { val BrokerDefaultEntityName = "" val BrokerLoggerConfigType = "broker-loggers" val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType - val ZkSupportedConfigTypes = ConfigType.all + val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker) Review comment: If you're saying this comment: > // Dynamic broker configs can only be updated using the new AdminClient once brokers have started // so that configs may be fully validated. Prior to starting brokers, updates may be performed using // ZooKeeper for bootstrapping. This allows all password configs to be stored encrypted in ZK, // avoiding clear passwords in server.properties. For consistency with older versions, quota-related // broker configs can still be updated using ZooKeeper at any time. I checked and confirmed that it is saying these 3 configs of "broker" type (not "quota" type): ``` val BrokerConfigsUpdatableUsingZooKeeperWhileBrokerRunning = Set( DynamicConfig.Broker.LeaderReplicationThrottledRateProp, DynamicConfig.Broker.FollowerReplicationThrottledRateProp, DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config
showuon commented on pull request #10811: URL: https://github.com/apache/kafka/pull/10811#issuecomment-871132339 @ijuma , I've updated the PR. Please take a look again. Thank you. Failed tests are unrelated. ``` Build / JDK 16 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() Build / JDK 16 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config
showuon commented on a change in pull request #10811: URL: https://github.com/apache/kafka/pull/10811#discussion_r661161241 ## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ## @@ -68,14 +68,13 @@ object ConfigCommand extends Config { val BrokerDefaultEntityName = "" val BrokerLoggerConfigType = "broker-loggers" val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType - val ZkSupportedConfigTypes = ConfigType.all + val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker) val DefaultScramIterations = 4096 // Dynamic broker configs can only be updated using the new AdminClient once brokers have started // so that configs may be fully validated. Prior to starting brokers, updates may be performed using // ZooKeeper for bootstrapping. This allows all password configs to be stored encrypted in ZK, // avoiding clear passwords in server.properties. For consistency with older versions, quota-related - // broker configs can still be updated using ZooKeeper at any time. ConfigCommand will be migrated - // to the new AdminClient later for these configs (KIP-248). Review comment: I've checked and confirmed that KIP-248 is rejected now. It won't be implemented. So, remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations
mjsax commented on a change in pull request #10926: URL: https://github.com/apache/kafka/pull/10926#discussion_r661136400 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ## @@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs, if (beforeMs + afterMs < 0) { throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative."); } + +if (graceMs < 0) { +throw new IllegalArgumentException("Grace period must not be negative."); +} + this.afterMs = afterMs; this.beforeMs = beforeMs; this.graceMs = graceMs; this.enableSpuriousResultFix = enableSpuriousResultFix; } +/** + * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}, + * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than + * the timestamp of the record from the primary stream. Using the method explicitly sets the grace period to + * the duration specified by {@code afterWindowEnd} which means that out of order records arriving Review comment: `out-of-order` (with `-`) ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java ## @@ -81,18 +90,63 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs) * * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows. * Tumbling windows are a special case of hopping windows with {@code advance == size}. + * Using the method implicitly sets the grace period to zero which means + * that out of order records arriving after the window end will be dropped * * @param size The size of the window - * @return a new window definition with default maintain duration of 1 day + * @return a new window definition with default no grace period. Note that this means out of order records arriving after the window end will be dropped + * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds} + */ +public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException { +return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD)); +} + +/** + * Return a window definition with the given window size, and with the advance interval being equal to the window + * size. + * The time interval represented by the N-th window is: {@code [N * size, N * size + size)}. + * + * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows. + * Tumbling windows are a special case of hopping windows with {@code advance == size}. + * Using the method explicitly sets the grace period to the duration specified by {@code afterWindowEnd} which means + * that out of order records arriving after the window end will be dropped. + * + * + * Delay is defined as (stream_time - record_timestamp). + * + * @param size The size of the window. Must be larger than zero + * @param afterWindowEnd The grace period to admit out-of-order events to a window. Must be non-negative. + * @return a TimeWindows object with the specified size and the specified grace period + * @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds} + */ +public static TimeWindows ofSizeAndGrace(final Duration size, final Duration afterWindowEnd) +throws IllegalArgumentException { Review comment: nit: move the previous line? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ## @@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs, if (beforeMs + afterMs < 0) { throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative."); } + +if (graceMs < 0) { +throw new IllegalArgumentException("Grace period must not be negative."); +} + this.afterMs = afterMs; this.beforeMs = beforeMs; this.graceMs = graceMs; this.enableSpuriousResultFix = enableSpuriousResultFix; } +/** + * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}, + * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than + * the timestamp of the record from the primary stream. Using the method explicitly sets the grace period to + * the duration specified by {@code afterWindowEnd} which means that out of order records arriving + * after the window end will be dropped
[GitHub] [kafka] mjsax commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix
mjsax commented on a change in pull request #10917: URL: https://github.com/apache/kafka/pull/10917#discussion_r661100306 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -938,6 +938,9 @@ // Private API used to disable the fix on left/outer joins (https://issues.apache.org/jira/browse/KAFKA-10847) public static final String ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__enable.kstreams.outer.join.spurious.results.fix__"; +// Private API used to control the emit latency for left/outer join results (https://issues.apache.org/jira/browse/KAFKA-10847) +public static final String EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "emit.interval.ms.kstreams.outer.join.spurious.results.fix__"; Review comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config
showuon commented on a change in pull request #10811: URL: https://github.com/apache/kafka/pull/10811#discussion_r661098484 ## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ## @@ -68,7 +68,7 @@ object ConfigCommand extends Config { val BrokerDefaultEntityName = "" val BrokerLoggerConfigType = "broker-loggers" val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType - val ZkSupportedConfigTypes = ConfigType.all + val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker) Review comment: If you're saying this comment: > // Dynamic broker configs can only be updated using the new AdminClient once brokers have started // so that configs may be fully validated. Prior to starting brokers, updates may be performed using // ZooKeeper for bootstrapping. This allows all password configs to be stored encrypted in ZK, // avoiding clear passwords in server.properties. For consistency with older versions, quota-related // broker configs can still be updated using ZooKeeper at any time. I checked and confirmed that is saying these 3 configs of "broker" type (not "quota" type): ``` val BrokerConfigsUpdatableUsingZooKeeperWhileBrokerRunning = Set( DynamicConfig.Broker.LeaderReplicationThrottledRateProp, DynamicConfig.Broker.FollowerReplicationThrottledRateProp, DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r661073496 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) { } @Override -public DeleteTopicsResult deleteTopics(final Collection topicNames, +public DeleteTopicsResult deleteTopics(final TopicCollection topics, final DeleteTopicsOptions options) { +DeleteTopicsResult result; +if (topics instanceof TopicIdCollection) +result = DeleteTopicsResult.ofTopicIds(new HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), options))); Review comment: I can move this cast to the `handle...` call and return a KafkaFuture if it makes things cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12660) Do not update offset commit sensor after append failure
[ https://issues.apache.org/jira/browse/KAFKA-12660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371766#comment-17371766 ] dengziming commented on KAFKA-12660: [~kirktrue] Thank you for your attention, but I have already opened a PR for this issue: [https://github.com/apache/kafka/pull/10560,] are you interested in reviewing it? > Do not update offset commit sensor after append failure > --- > > Key: KAFKA-12660 > URL: https://issues.apache.org/jira/browse/KAFKA-12660 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: dengziming >Priority: Major > > In the append callback after writing an offset to the log in > `GroupMetadataManager`, It seems wrong to update the offset commit sensor > prior to checking for errors: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L394. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r661068251 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) { } @Override -public DeleteTopicsResult deleteTopics(final Collection topicNames, +public DeleteTopicsResult deleteTopics(final TopicCollection topics, final DeleteTopicsOptions options) { +DeleteTopicsResult result; +if (topics instanceof TopicIdCollection) +result = DeleteTopicsResult.ofTopicIds(new HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), options))); Review comment: Many other admin client calls also follow this pattern with using KafkaFutureImpl and copying the result to get KafkaFutures -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r661068251 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) { } @Override -public DeleteTopicsResult deleteTopics(final Collection topicNames, +public DeleteTopicsResult deleteTopics(final TopicCollection topics, final DeleteTopicsOptions options) { +DeleteTopicsResult result; +if (topics instanceof TopicIdCollection) +result = DeleteTopicsResult.ofTopicIds(new HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), options))); Review comment: All other admin client calls also follow this pattern with using KafkaFutureImpl and copying the result to get KafkaFutures -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r661067320 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) { } @Override -public DeleteTopicsResult deleteTopics(final Collection topicNames, +public DeleteTopicsResult deleteTopics(final TopicCollection topics, final DeleteTopicsOptions options) { +DeleteTopicsResult result; +if (topics instanceof TopicIdCollection) +result = DeleteTopicsResult.ofTopicIds(new HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), options))); Review comment: Hmmm. When I try to remove I get `'ofTopicIds(java.util.Map>)' in 'org.apache.kafka.clients.admin.DeleteTopicsResult' cannot be applied to '(java.util.Map>)'` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r661066009 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ## @@ -30,24 +31,56 @@ */ @InterfaceStability.Evolving public class DeleteTopicsResult { -final Map> futures; +private final Map> topicIdFutures; +private final Map> nameFutures; -protected DeleteTopicsResult(Map> futures) { -this.futures = futures; +protected DeleteTopicsResult(Map> topicIdFutures, Map> nameFutures) { +if (topicIdFutures != null && nameFutures != null) +throw new IllegalArgumentException("topicIdFutures and nameFutures can not both be specified."); +this.topicIdFutures = topicIdFutures; +this.nameFutures = nameFutures; } +protected static DeleteTopicsResult ofTopicIds(Map> topicIdFutures) { Review comment: Forgive my ignorance but is that not the same thing? Is package private just that we can't use in subclasses outside the package? If that's the case then sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10838: MINOR: small fix and clean up for DynamicConfigManager
showuon commented on pull request #10838: URL: https://github.com/apache/kafka/pull/10838#issuecomment-871028414 Failed tests are unrelated. Thank you. ``` Build / JDK 16 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics() Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics() Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix
showuon commented on a change in pull request #10917: URL: https://github.com/apache/kafka/pull/10917#discussion_r661064592 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -938,6 +938,9 @@ // Private API used to disable the fix on left/outer joins (https://issues.apache.org/jira/browse/KAFKA-10847) public static final String ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__enable.kstreams.outer.join.spurious.results.fix__"; +// Private API used to control the emit latency for left/outer join results (https://issues.apache.org/jira/browse/KAFKA-10847) +public static final String EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "emit.interval.ms.kstreams.outer.join.spurious.results.fix__"; Review comment: Should we add underscore in front of the config like the above ones? i.e. `__emit.interval.ms.kstreams.outer.join.spurious.results.fix__` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r661064384 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) { } @Override -public DeleteTopicsResult deleteTopics(final Collection topicNames, +public DeleteTopicsResult deleteTopics(final TopicCollection topics, final DeleteTopicsOptions options) { +DeleteTopicsResult result; +if (topics instanceof TopicIdCollection) +result = DeleteTopicsResult.ofTopicIds(new HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), options))); Review comment: This was an artifact of the old version of the code. We would create a copy of the map ``` return new DeleteTopicsResult(new HashMap<>(topicFutures)); ``` I can remove though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r661062996 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ## @@ -30,24 +31,56 @@ */ @InterfaceStability.Evolving public class DeleteTopicsResult { -final Map> futures; +private final Map> topicIdFutures; +private final Map> nameFutures; -protected DeleteTopicsResult(Map> futures) { -this.futures = futures; +protected DeleteTopicsResult(Map> topicIdFutures, Map> nameFutures) { +if (topicIdFutures != null && nameFutures != null) +throw new IllegalArgumentException("topicIdFutures and nameFutures can not both be specified."); +this.topicIdFutures = topicIdFutures; +this.nameFutures = nameFutures; } +protected static DeleteTopicsResult ofTopicIds(Map> topicIdFutures) { +DeleteTopicsResult result = new DeleteTopicsResult(topicIdFutures, null); +return result; +} + +protected static DeleteTopicsResult ofTopicNames(Map> nameFutures) { +DeleteTopicsResult result = new DeleteTopicsResult(null, nameFutures); Review comment: Yes. This was left over from before and I didn't clean up enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API
jolshan commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r661062826 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ## @@ -30,24 +31,56 @@ */ @InterfaceStability.Evolving public class DeleteTopicsResult { -final Map> futures; +private final Map> topicIdFutures; +private final Map> nameFutures; -protected DeleteTopicsResult(Map> futures) { -this.futures = futures; +protected DeleteTopicsResult(Map> topicIdFutures, Map> nameFutures) { +if (topicIdFutures != null && nameFutures != null) Review comment: good point -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API
hachikuji commented on a change in pull request #10892: URL: https://github.com/apache/kafka/pull/10892#discussion_r661058030 ## File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.util.Collection; +import java.util.Collections; + +/** + * A class used to represent a collection of topics. This collection may define topics by topic name + * or topic ID. Subclassing this class beyond the classes provided here is not supported. + */ +public abstract class TopicCollection { + +private TopicCollection() {} + +/** + * @return a collection of topics defined by topic ID + */ +public static TopicIdCollection ofTopicIds(Collection topics) { +return new TopicIdCollection(topics); Review comment: Since the collection here comes from the user, maybe we should make a copy. Otherwise, the application could mutate it while we have a reference. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ## @@ -30,24 +31,56 @@ */ @InterfaceStability.Evolving public class DeleteTopicsResult { -final Map> futures; +private final Map> topicIdFutures; +private final Map> nameFutures; -protected DeleteTopicsResult(Map> futures) { -this.futures = futures; +protected DeleteTopicsResult(Map> topicIdFutures, Map> nameFutures) { +if (topicIdFutures != null && nameFutures != null) Review comment: This allows both of them to be null? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) { } @Override -public DeleteTopicsResult deleteTopics(final Collection topicNames, +public DeleteTopicsResult deleteTopics(final TopicCollection topics, final DeleteTopicsOptions options) { +DeleteTopicsResult result; +if (topics instanceof TopicIdCollection) +result = DeleteTopicsResult.ofTopicIds(new HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), options))); Review comment: Why do we copy the result of `handleDeleteTopicsUsingIds`? Seems like that method is already returning a fresh map. ## File path: clients/src/test/java/org/apache/kafka/clients/admin/DeleteTopicsResultTest.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collections; +import java.util.Map; + +public class DeleteTopicsResultTest { + +@Test +public void testDeleteTopicsResult() { Review comment: nit: would it make sense to turn this into two separate tests? One for ids and one for names? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ## @@ -30,24 +31,56 @@ */ @InterfaceStability.Evolving public c
[GitHub] [kafka] guozhangwang commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix
guozhangwang commented on a change in pull request #10917: URL: https://github.com/apache/kafka/pull/10917#discussion_r661055137 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -183,15 +195,27 @@ public void process(final K key, final V1 value) { @SuppressWarnings("unchecked") private void emitNonJoinedOuterRecords(final WindowStore, LeftOrRightValue> store) { +if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinAfterMs - joinGraceMs) { Review comment: nit: add some explanations on these two conditions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`
mjsax commented on a change in pull request #10813: URL: https://github.com/apache/kafka/pull/10813#discussion_r661045444 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -1426,6 +1425,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque public Serde defaultKeySerde() { final Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG); try { +if (keySerdeConfigSetting == null) { +return null; Review comment: It's been a while, but I am still wondering if we should throw a `ConfigException` directly instead of returning `null`? And this would be the only place in the code when we throw a `ConfigException` for this case. Below, there is some repetitive code that calls `defaultKeySerde` (or `defaultValueSerd`) and throws `ConfigException` if `null` is returned. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java ## @@ -69,6 +70,9 @@ Optional.empty() ); } catch (final Exception deserializationException) { +if (deserializationException instanceof ConfigException) { Review comment: Where does the `ConfigException` come from? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java ## @@ -27,20 +31,37 @@ public class WrappingNullableUtils { @SuppressWarnings("unchecked") -private static Deserializer prepareDeserializer(final Deserializer specificDeserializer, final Deserializer contextKeyDeserializer, final Deserializer contextValueDeserializer, final boolean isKey) { -Deserializer deserializerToUse = specificDeserializer; -if (deserializerToUse == null) { +private static Deserializer prepareDeserializer(final Deserializer specificDeserializer, final ProcessorContext context, final boolean isKey, final String name) { +final Deserializer contextKeyDeserializer = context.keySerde() == null ? null : context.keySerde().deserializer(); +final Deserializer contextValueDeserializer = context.valueSerde() == null ? null : context.valueSerde().deserializer(); +final Deserializer deserializerToUse; + +if (specificDeserializer == null) { deserializerToUse = (Deserializer) (isKey ? contextKeyDeserializer : contextValueDeserializer); +} else { +deserializerToUse = specificDeserializer; +} +if (deserializerToUse == null) { +final String serde = isKey ? "key" : "value"; +throw new ConfigException("Failed to create deserializers. Please specify a " + serde + " serde through produced or materialized, or set one through StreamsConfig#DEFAULT_" + serde.toUpperCase(Locale.ROOT) + "_SERDE_CLASS_CONFIG for node " + name); Review comment: Following the comments from above. If we throw in `context.keySerde()` we can avoid this redundant code. Of course, we should only call `context.keySerde()` if `specificDeserializer == null` for this case. Similar below. ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -1445,6 +1447,9 @@ public Serde defaultKeySerde() { public Serde defaultValueSerde() { final Object valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG); try { +if (valueSerdeConfigSetting == null) { +return null; Review comment: As above ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java ## @@ -39,7 +39,10 @@ public ChangedDeserializer(final Deserializer inner) { @Override public void setIfUnset(final Deserializer defaultKeyDeserializer, final Deserializer defaultValueDeserializer) { Review comment: Could we pass the context instead of both deserializers, and simplify to (relying on the context to throw if necessary): ``` if (inner == null) { inner = context.valueSerde(); } ``` ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java ## @@ -54,10 +55,14 @@ public CombinedKeySchema(final Supplier foreignKeySerdeTopicSupplier, public void init(final ProcessorContext context) { primaryKeySerdeTopic = undecoratedPrimaryKeySerdeTopicSupplier.get(); foreignKeySerdeTopic = undecoratedForeignKeySerdeTopicSupplier.get(); -primaryKeySerializer = primaryKeySerializer == null ? (Serializer) context.keySerde().serializer() : primaryKeySerializer; -primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer) context.keySerde().deserializer() : primaryKeyDeserializer; -foreignKeySerializer = foreignKeySerializer == null ? (Serializer) context.keySerde().serializer()
[GitHub] [kafka] zhaohaidao commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked to load snapshot
zhaohaidao commented on a change in pull request #10932: URL: https://github.com/apache/kafka/pull/10932#discussion_r661038035 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -157,13 +160,18 @@ public synchronized void handleSnapshot(SnapshotReader reader) { public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) { if (newLeader.isLeader(nodeId)) { log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", -committed, newLeader); +committed, newLeader); uncommitted = committed; claimedEpoch = OptionalInt.of(newLeader.epoch()); } else { log.debug("Counter uncommitted value reset after resigning leadership"); uncommitted = -1; claimedEpoch = OptionalInt.empty(); } +handleSnapshotCalls = 0; +} + +public int getHandleSnapshotCalls() { Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked to load snapshot
zhaohaidao commented on a change in pull request #10932: URL: https://github.com/apache/kafka/pull/10932#discussion_r661037975 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -1014,6 +1015,26 @@ public void verify() { } } +private static class LeaderNeverLoadSnapshot implements Invariant { +final Cluster cluster; + +private LeaderNeverLoadSnapshot(Cluster cluster) { +this.cluster = cluster; +} + +@Override +public void verify() { +for (RaftNode raftNode : cluster.running()) { +if (raftNode.counter.isWritable()) { +assertTrue(raftNode.counter.getHandleSnapshotCalls() == 0); Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r661028655 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ## @@ -210,6 +211,15 @@ public void onDeletion(ConnectorTaskId id) { statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation())); } +public void onRestart(String connector) { Review comment: @kkonstantine I moved it to the Lisetner interface in https://github.com/apache/kafka/pull/10822/commits/7f8a588aaba9f1f80889e0429b2b9f495b1c666b. Could you please check to see if it looks good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix
mjsax commented on a change in pull request #10917: URL: https://github.com/apache/kafka/pull/10917#discussion_r661027703 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -186,12 +190,17 @@ public void process(final K key, final V1 value) { @SuppressWarnings("unchecked") private void emitNonJoinedOuterRecords(final WindowStore, LeftOrRightValue> store) { +if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) { Review comment: That a good point, and I was concerned about it, too. Using `beforeMs` might increase the latency to much, especially for large windows. I might actually be best, to just to processing-time periodic emits, to avoid calling `store.all()` too oftern. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kkonstantine commented on pull request #10822: URL: https://github.com/apache/kafka/pull/10822#issuecomment-870979782 fyi, a test failure seems relevant: `testCORSEnabled – org.apache.kafka.connect.runtime.rest.RestServerTest` I don't remember this test being flaky and it failed in both builders. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10942: MINOR: Move ZkMetadataCache into its own file.
cmccabe merged pull request #10942: URL: https://github.com/apache/kafka/pull/10942 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #10942: MINOR: Move ZkMetadataCache into its own file.
cmccabe opened a new pull request #10942: URL: https://github.com/apache/kafka/pull/10942 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe merged pull request #10887: URL: https://github.com/apache/kafka/pull/10887 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ryannedolan commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy
ryannedolan commented on a change in pull request #10652: URL: https://github.com/apache/kafka/pull/10652#discussion_r661007855 ## File path: docs/upgrade.html ## @@ -80,7 +80,13 @@ Notable changes in 3 understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass in new ConsumerGroupMetadata(consumerGroupId) to work with older brokers. See https://cwiki.apache.org/confluence/x/zJONCg";>KIP-732 for more details. - + The Connect-based MirrorMaker (MM2) includes changes to support IdentityReplicationPolicy, enabling replication without renaming topics. Review comment: @mimaison @mdedetrich wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12660) Do not update offset commit sensor after append failure
[ https://issues.apache.org/jira/browse/KAFKA-12660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371669#comment-17371669 ] Kirk True commented on KAFKA-12660: --- [~dengziming] - can I take a look at this? I have some spare cycles to investigate and hopefully fix. > Do not update offset commit sensor after append failure > --- > > Key: KAFKA-12660 > URL: https://issues.apache.org/jira/browse/KAFKA-12660 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: dengziming >Priority: Major > > In the append callback after writing an offset to the log in > `GroupMetadataManager`, It seems wrong to update the offset commit sensor > prior to checking for errors: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L394. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #10941: KAFKA-10847: Remove internal config for enabling the fix
guozhangwang commented on pull request #10941: URL: https://github.com/apache/kafka/pull/10941#issuecomment-870917133 ping @mjsax @spena . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request #10941: KAFKA-10847: Remove internal config for enabling the fix
guozhangwang opened a new pull request #10941: URL: https://github.com/apache/kafka/pull/10941 Also update the upgrade guide indicating about the grace period KIP and its indication on the fix with throughput impact. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] izzyacademy commented on pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations
izzyacademy commented on pull request #10926: URL: https://github.com/apache/kafka/pull/10926#issuecomment-870911243 @showuon @ableegoldman @mjsax @cadonna I have updated the PR with the requested changes. Please take a look when you have a moment. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
mimaison commented on pull request #10743: URL: https://github.com/apache/kafka/pull/10743#issuecomment-870910353 @dajac Thanks, I opened JIRAs for all follow up work items we identified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
mimaison commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r660956483 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java ## @@ -250,6 +252,13 @@ public void onFailure( .filter(future.lookupKeys()::contains) .collect(Collectors.toSet()); retryLookup(keysToUnmap); + +} else if (t instanceof NoBatchedFindCoordinatorsException) { Review comment: I opened https://issues.apache.org/jira/browse/KAFKA-13013 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13013) Avoid specific exception handling in AdminApiDriver
Mickael Maison created KAFKA-13013: -- Summary: Avoid specific exception handling in AdminApiDriver Key: KAFKA-13013 URL: https://issues.apache.org/jira/browse/KAFKA-13013 Project: Kafka Issue Type: Improvement Components: admin Reporter: Mickael Maison To enable downconverting requests when brokers don't support FindCoordinator v4, we added some logic in AdminApiDriver to handle NoBatchedFindCoordinatorsException. Ideally AdminApiDriver should not be aware of specific exceptions like this and instead delegate the handling to the corresponding strategy. Another option is to not rely on an Exception to identify the request version to use and use ApiVersions when creating the initial request. A good solution would work nicely with all types of clients (consumer, producer, admin). More details in: - https://github.com/apache/kafka/pull/10743#discussion_r659602262 - https://github.com/apache/kafka/pull/10743#discussion_r649872433 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations
izzyacademy commented on a change in pull request #10926: URL: https://github.com/apache/kafka/pull/10926#discussion_r660953683 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ## @@ -83,6 +83,14 @@ protected JoinWindows(final JoinWindows joinWindows) { afterMs = joinWindows.afterMs; graceMs = joinWindows.graceMs; enableSpuriousResultFix = joinWindows.enableSpuriousResultFix; + +if (beforeMs + afterMs < 0) { +throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative."); Review comment: > Could we make this Constructor to call another overloaded constructor, to avoid duplicated codes? i.e. > > ```java > protected JoinWindows(final JoinWindows joinWindows) { > this(joinWindows.beforeMs, joinWindows.afterMs, joinWindows.graceMs, joinWindows.enableSpuriousResultFix); > } > ``` This is done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations
izzyacademy commented on a change in pull request #10926: URL: https://github.com/apache/kafka/pull/10926#discussion_r660953456 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ## @@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs, if (beforeMs + afterMs < 0) { throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative."); } + +if (graceMs < 0) { +throw new IllegalArgumentException("Grace period must not be negative."); +} + this.afterMs = afterMs; this.beforeMs = beforeMs; this.graceMs = graceMs; this.enableSpuriousResultFix = enableSpuriousResultFix; } +/** + * Reject out-of-order events that are delayed more than {@code afterWindowEnd} + * after the end of its window. + * + * Delay is defined as (stream_time - record_timestamp). + * + * @param timeDifference join window interval + * @param afterWindowEnd The grace period to admit out-of-order events to a window. + * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds} + * @return A new JoinWindows object with the specified window definition and grace period + */ public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) { return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true); } +/** + * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}, + * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than + * the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero + * which means that out of order records arriving after the window end will be dropped. Review comment: Upon review, I have updated the javadocs but I dont think they can be identical in wording. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660953271 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ## @@ -210,6 +211,15 @@ public void onDeletion(ConnectorTaskId id) { statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation())); } +public void onRestart(String connector) { Review comment: @kkonstantine you are right. Do you mind if we do this work in a follow-up PR? The reason I ask is because if we add it to the listener then it becomes part of the interface and this would require me to retrofit the listener event into the old restartTask and restartConnector API for backward compatibility reasons and it can be big change to this already big PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
mimaison commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r660952842 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public class AlterConsumerGroupOffsetsHandler implements AdminApiHandler> { + +private final CoordinatorKey groupId; +private final Map offsets; +private final Logger log; +private final AdminApiLookupStrategy lookupStrategy; + +public AlterConsumerGroupOffsetsHandler( +String groupId, +Map offsets, +LogContext logContext +) { +this.groupId = CoordinatorKey.byGroupId(groupId); +this.offsets = offsets; +this.log = logContext.logger(AlterConsumerGroupOffsetsHandler.class); +this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); +} + +@Override +public String apiName() { +return "offsetCommit"; +} + +@Override +public AdminApiLookupStrategy lookupStrategy() { +return lookupStrategy; +} + +public static AdminApiFuture.SimpleAdminApiFuture> newFuture( +String groupId +) { +return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId))); +} + +@Override +public OffsetCommitRequest.Builder buildRequest(int brokerId, Set keys) { +List topics = new ArrayList<>(); +Map> offsetData = new HashMap<>(); +for (Map.Entry entry : offsets.entrySet()) { +String topic = entry.getKey().topic(); +OffsetAndMetadata oam = entry.getValue(); +offsetData.compute(topic, (key, value) -> { +if (value == null) { +value = new ArrayList<>(); +} +OffsetCommitRequestPartition partition = new OffsetCommitRequestPartition() +.setCommittedOffset(oam.offset()) +.setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1)) +.setCommittedMetadata(oam.metadata()) +.setPartitionIndex(entry.getKey().partition()); +value.add(partition); +return value; +}); +} +for (Map.Entry> entry : offsetData.entrySet()) { +OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic() +.setName(entry.getKey()) +.setPartitions(entry.getValue()); +topics.add(topic); +} +OffsetCommitRequestData data = new OffsetCommitRequestData() +.setGroupId(groupId.idValue) +.setTopics(topics); +return new OffsetCommitRequest.Builder(data); +} + +@Override +public ApiResult> handleResponse(Node broker, Set groupIds, +AbstractResponse abstractResponse) { + +final OffsetCommitResponse response = (Offset
[jira] [Created] (KAFKA-13012) Rationalize error handling of AdminClient calls
Mickael Maison created KAFKA-13012: -- Summary: Rationalize error handling of AdminClient calls Key: KAFKA-13012 URL: https://issues.apache.org/jira/browse/KAFKA-13012 Project: Kafka Issue Type: Improvement Components: admin Reporter: Mickael Maison While working on KIP-699, we noticed many AdminClient calls perform slightly different error handling. For example, all fulfilment calls following a coordinator lookup should handle coordinator/group errors the same way. Currently each Handler has its own handleError() method with subtle differences. More details in https://github.com/apache/kafka/pull/10743#discussion_r653151839 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations
izzyacademy commented on a change in pull request #10926: URL: https://github.com/apache/kafka/pull/10926#discussion_r660952604 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java ## @@ -108,13 +147,12 @@ public static SessionWindows with(final Duration inactivityGap) { * @param afterWindowEnd The grace period to admit out-of-order events to a window. * @return this updated builder * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds} + * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, Duration)} instead */ +@Deprecated public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException { final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd"); final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix); -if (afterWindowEndMs < 0) { -throw new IllegalArgumentException("Grace period must not be negative."); -} Review comment: Thanks for this feedback @showuon I have updated the code to share/reuse constructors. I think @ableegoldman will like that as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations
izzyacademy commented on a change in pull request #10926: URL: https://github.com/apache/kafka/pull/10926#discussion_r660948497 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java ## @@ -40,7 +40,11 @@ // By default grace period is 24 hours for all windows, // in other words we allow out-of-order data for up to a day -protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L; +// This behavior is now deprecated +protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 60 * 1000L; Review comment: This is not a publicly available constant. It is only used internally by the implementers so I think it is OK. The reasoning for deprecating the constant is explained thoroughly in the KIP so I think they will understand. I can add more details to the Java comments to clarify. But thanks @showuon for bring it up in the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…
mimaison merged pull request #10221: URL: https://github.com/apache/kafka/pull/10221 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660935556 ## File path: core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala ## @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util +import java.util.Collections +import java.util.concurrent.locks.ReentrantReadWriteLock + +import kafka.admin.BrokerMetadata + +import scala.collection.{Seq, Set, mutable} +import scala.jdk.CollectionConverters._ +import kafka.cluster.{Broker, EndPoint} +import kafka.api._ +import kafka.controller.StateChangeLogger +import kafka.utils.CoreUtils._ +import kafka.utils.Logging +import kafka.utils.Implicits._ +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState +import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} +import org.apache.kafka.common.security.auth.SecurityProtocol + +/** + * A cache for the state (e.g., current leader) of each partition. This cache is updated through + * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. + */ +class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { Review comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660928423 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1721,8 +1720,10 @@ class ReplicaManager(val config: KafkaConfig, } else { // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition => - val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get -.brokerEndPoint(config.interBrokerListenerName) + val leaderNode = metadataCache.getAliveBrokerNode( +partition.leaderReplicaIdOpt.getOrElse(-1), config.interBrokerListenerName.value()). Review comment: Hmm... `getAliveBrokerNode` returns `None` or `Some`, never `noNode`. Anyway, I don't feel strongly about this, so I just used the version you suggested. I suppose it's one less place where we use -1 as a sentinel value (although honestly, there are enough of these places that -1 will never be a valid node value in Kafka, I think.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660928423 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1721,8 +1720,10 @@ class ReplicaManager(val config: KafkaConfig, } else { // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition => - val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get -.brokerEndPoint(config.interBrokerListenerName) + val leaderNode = metadataCache.getAliveBrokerNode( +partition.leaderReplicaIdOpt.getOrElse(-1), config.interBrokerListenerName.value()). Review comment: `getAliveBrokerNode` returns `None` when a broker ID that is not alive is passed. The conversion to `NoNode` happens in `ReplicaManager`, not in the metadata cache. I guess I don't feel strongly about it, though, if you want a different version of this...? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660925631 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -62,17 +46,22 @@ trait MetadataCache { def getAllTopics(): collection.Set[String] - def getAllPartitions(): collection.Set[TopicPartition] + def getTopicPartitions(topicName: String): collection.Set[TopicPartition] - def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + def hasAliveBroker(brokerId: Int): Boolean - def getAliveBroker(brokerId: Int): Option[MetadataBroker] + def getAliveBrokers(): Iterable[BrokerMetadata] - def getAliveBrokers: collection.Seq[MetadataBroker] + def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node] + + def getAliveBrokerNodes(listenerName: String): Iterable[Node] Review comment: Yes, let's just use `ListenerName`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660925249 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -1063,6 +1076,132 @@ public int generation() { return generation; } +@Override +public void restartConnectorAndTasks( +RestartRequest request, +Callback callback +) { +final String connectorName = request.connectorName(); +addRequest( +() -> { +if (checkRebalanceNeeded(callback)) { +return null; +} +if (!configState.connectors().contains(request.connectorName())) { +callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); +return null; +} +if (isLeader()) { +// Write a restart request to the config backing store, to be executed asynchronously in tick() +configBackingStore.putRestartRequest(request); +// Compute and send the response that this was accepted +Optional maybePlan = buildRestartPlan(request); Review comment: @kkonstantine after your latest explanation it makes sense to rename it to keep it simple. Thanks for guiding me. I have renamed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660911200 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -62,17 +46,22 @@ trait MetadataCache { def getAllTopics(): collection.Set[String] - def getAllPartitions(): collection.Set[TopicPartition] + def getTopicPartitions(topicName: String): collection.Set[TopicPartition] - def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + def hasAliveBroker(brokerId: Int): Boolean - def getAliveBroker(brokerId: Int): Option[MetadataBroker] + def getAliveBrokers(): Iterable[BrokerMetadata] - def getAliveBrokers: collection.Seq[MetadataBroker] + def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node] + + def getAliveBrokerNodes(listenerName: String): Iterable[Node] def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] - def numPartitions(topic: String): Option[Int] + /** + * Return the number of partitions in the given topic, or 0 if the given topic does not exist. + */ + def numPartitions(topic: String): Int Review comment: OK, I will just convert it back to an `Option[Int]`, then. Edit: that is more consistent with some of the other functions that return `Option` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12996) OffsetOutOfRange not handled correctly for diverging epochs when fetch offset less than leader start offset
[ https://issues.apache.org/jira/browse/KAFKA-12996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-12996: --- Fix Version/s: 2.7.2 Affects Version/s: 2.7.1 > OffsetOutOfRange not handled correctly for diverging epochs when fetch offset > less than leader start offset > --- > > Key: KAFKA-12996 > URL: https://issues.apache.org/jira/browse/KAFKA-12996 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0, 2.7.1 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > {color:#24292e}If fetchOffset < startOffset, we currently throw > OffsetOutOfRangeException when attempting to read from the log in the regular > case. But for diverging epochs, we return Errors.NONE with the new leader > start offset, hwm etc.. ReplicaFetcherThread throws OffsetOutOfRangeException > when processing responses with Errors.NONE if the leader's offsets in the > response are out of range and this moves the partition to failed state. We > should add a check for this case when processing fetch requests and ensure > OffsetOutOfRangeException is thrown regardless of epochs.{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12996) OffsetOutOfRange not handled correctly for diverging epochs when fetch offset less than leader start offset
[ https://issues.apache.org/jira/browse/KAFKA-12996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-12996. Reviewer: Guozhang Wang Resolution: Fixed > OffsetOutOfRange not handled correctly for diverging epochs when fetch offset > less than leader start offset > --- > > Key: KAFKA-12996 > URL: https://issues.apache.org/jira/browse/KAFKA-12996 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0, 2.7.1 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > {color:#24292e}If fetchOffset < startOffset, we currently throw > OffsetOutOfRangeException when attempting to read from the log in the regular > case. But for diverging epochs, we return Errors.NONE with the new leader > start offset, hwm etc.. ReplicaFetcherThread throws OffsetOutOfRangeException > when processing responses with Errors.NONE if the leader's offsets in the > response are out of range and this moves the partition to failed state. We > should add a check for this case when processing fetch requests and ensure > OffsetOutOfRangeException is thrown regardless of epochs.{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660911200 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -62,17 +46,22 @@ trait MetadataCache { def getAllTopics(): collection.Set[String] - def getAllPartitions(): collection.Set[TopicPartition] + def getTopicPartitions(topicName: String): collection.Set[TopicPartition] - def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + def hasAliveBroker(brokerId: Int): Boolean - def getAliveBroker(brokerId: Int): Option[MetadataBroker] + def getAliveBrokers(): Iterable[BrokerMetadata] - def getAliveBrokers: collection.Seq[MetadataBroker] + def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node] + + def getAliveBrokerNodes(listenerName: String): Iterable[Node] def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] - def numPartitions(topic: String): Option[Int] + /** + * Return the number of partitions in the given topic, or 0 if the given topic does not exist. + */ + def numPartitions(topic: String): Int Review comment: OK, I will just convert it back to an `Option[Int]`, then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
mumrah commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660902180 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -62,17 +46,22 @@ trait MetadataCache { def getAllTopics(): collection.Set[String] - def getAllPartitions(): collection.Set[TopicPartition] + def getTopicPartitions(topicName: String): collection.Set[TopicPartition] - def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + def hasAliveBroker(brokerId: Int): Boolean - def getAliveBroker(brokerId: Int): Option[MetadataBroker] + def getAliveBrokers(): Iterable[BrokerMetadata] - def getAliveBrokers: collection.Seq[MetadataBroker] + def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node] + + def getAliveBrokerNodes(listenerName: String): Iterable[Node] def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] - def numPartitions(topic: String): Option[Int] + /** + * Return the number of partitions in the given topic, or 0 if the given topic does not exist. + */ + def numPartitions(topic: String): Int Review comment: I agree that the Option rather than a special value is better here. It's not a big deal either way for the current code since there aren't many usages, but in the future a new caller will have to know to check for `0`. Seems like it could be easy to introduce bugs. A longer term solution (beyond the scope of this PR) could be to have individual container objects for the topic-level metadata. This way we could easily encapsulate the "does this topic even exist" with an Option on the metadata cache. E.g., `metadataCache.topic("foo").map(_.numPartitions)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13011) Update deleteTopics Admin API
[ https://issues.apache.org/jira/browse/KAFKA-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-13011: -- Assignee: Justine Olshan > Update deleteTopics Admin API > - > > Key: KAFKA-13011 > URL: https://issues.apache.org/jira/browse/KAFKA-13011 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > Implement the new deleteTopics apis as described in the KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10764) Add support for returning topic IDs on create, supplying topic IDs for delete
[ https://issues.apache.org/jira/browse/KAFKA-10764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-10764. Resolution: Done Opened a new ticket for the change: https://issues.apache.org/jira/browse/KAFKA-13011 > Add support for returning topic IDs on create, supplying topic IDs for delete > - > > Key: KAFKA-10764 > URL: https://issues.apache.org/jira/browse/KAFKA-10764 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.0.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > Update Admin Client methods and update protocols to support topic IDs > UPDATED: we just need to figure out the right API and re-implement > AdminClient deleteTopics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13011) Update deleteTopics Admin API
Justine Olshan created KAFKA-13011: -- Summary: Update deleteTopics Admin API Key: KAFKA-13011 URL: https://issues.apache.org/jira/browse/KAFKA-13011 Project: Kafka Issue Type: Sub-task Reporter: Justine Olshan Implement the new deleteTopics apis as described in the KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10764) Add support for returning topic IDs on create, supplying topic IDs for delete
[ https://issues.apache.org/jira/browse/KAFKA-10764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371584#comment-17371584 ] Justine Olshan commented on KAFKA-10764: closed as I'm moving the admin client changes to this ticket: https://issues.apache.org/jira/browse/KAFKA-13011 > Add support for returning topic IDs on create, supplying topic IDs for delete > - > > Key: KAFKA-10764 > URL: https://issues.apache.org/jira/browse/KAFKA-10764 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.0.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > Update Admin Client methods and update protocols to support topic IDs > UPDATED: we just need to figure out the right API and re-implement > AdminClient deleteTopics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
hachikuji commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660883703 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -62,17 +46,22 @@ trait MetadataCache { def getAllTopics(): collection.Set[String] - def getAllPartitions(): collection.Set[TopicPartition] + def getTopicPartitions(topicName: String): collection.Set[TopicPartition] - def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + def hasAliveBroker(brokerId: Int): Boolean - def getAliveBroker(brokerId: Int): Option[MetadataBroker] + def getAliveBrokers(): Iterable[BrokerMetadata] - def getAliveBrokers: collection.Seq[MetadataBroker] + def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node] + + def getAliveBrokerNodes(listenerName: String): Iterable[Node] Review comment: Would it make sense to use `ListenerName` in these APIs? As far as I can tell, all of the usages are just using `ListenerName.value`. Also, in `ZkMetadataCache`, we end up constructing a new `ListenerName` from the string. ## File path: core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala ## @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util +import java.util.Collections +import java.util.concurrent.locks.ReentrantReadWriteLock + +import kafka.admin.BrokerMetadata + +import scala.collection.{Seq, Set, mutable} +import scala.jdk.CollectionConverters._ +import kafka.cluster.{Broker, EndPoint} +import kafka.api._ +import kafka.controller.StateChangeLogger +import kafka.utils.CoreUtils._ +import kafka.utils.Logging +import kafka.utils.Implicits._ +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState +import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} +import org.apache.kafka.common.security.auth.SecurityProtocol + +/** + * A cache for the state (e.g., current leader) of each partition. This cache is updated through + * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. + */ +class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { Review comment: I'm wondering if we could leave this class in its current place just for this PR. It is a little hard to see the diffs. In a trivial follow-up, we can factor it out. ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -62,17 +46,22 @@ trait MetadataCache { def getAllTopics(): collection.Set[String] - def getAllPartitions(): collection.Set[TopicPartition] + def getTopicPartitions(topicName: String): collection.Set[TopicPartition] - def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + def hasAliveBroker(brokerId: Int): Boolean - def getAliveBroker(brokerId: Int): Option[MetadataBroker] + def getAliveBrokers(): Iterable[BrokerMetadata] - def getAliveBrokers: collection.Seq[MetadataBroker] + def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node] + + def getAliveBrokerNodes(listenerName: String): Iterable[Node] def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] - def numPartitions(topic: String): Option[Int] + /** + * Return the number of partitions in the given topic, or 0 if the given topic does not exist. + */ + def numPartitions(topic: String): Int Review comment: An alternative might be to raise an exception if the topic does not exist? I agree with Jose that we are likely to have bugs resulting from the caller forgetting to handle the case of 0. ## File path: core/src/ma
[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
mumrah commented on a change in pull request #10864: URL: https://github.com/apache/kafka/pull/10864#discussion_r660880726 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -312,26 +315,142 @@ final class KafkaMetadataLog private ( } } - override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { -val (deleted, forgottenSnapshots) = snapshots synchronized { - latestSnapshotId().asScala match { -case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && - startOffset < logStartSnapshotId.offset && - logStartSnapshotId.offset <= snapshotId.offset && - log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + /** + * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the + * following invariants all hold true: + * + * This is not the latest snapshot (i.e., another snapshot proceeds this one) + * The offset of the next snapshot is greater than the log start offset + * The log can be advanced to the offset of the next snapshot + * + * This method is not thread safe and assumes a lock on the snapshots collection is held + */ + private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = { +if (snapshots.contains(snapshotId) && +snapshots.contains(nextSnapshotId) && +startOffset < nextSnapshotId.offset && +snapshotId.offset < nextSnapshotId.offset && +log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) { + log.deleteOldSegments() + val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]] + snapshots.remove(snapshotId) match { +case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot) +case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.") + } + removeSnapshots(forgotten) + true +} else { + false +} + } - // Delete all segments that have a "last offset" less than the log start offset - log.deleteOldSegments() + /** + * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe + */ + private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = { +snapshots.keys.toSeq.flatMap { + snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())} +} + } - // Forget snapshots less than the log start offset - (true, forgetSnapshotsBefore(logStartSnapshotId)) -case _ => - (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + /** + * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records + */ + private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = { +readSnapshot(snapshotId).asScala.flatMap { reader => + val it = reader.records().batchIterator() + if (it.hasNext) { +Some(it.next.maxTimestamp()) + } else { +None } } + } -removeSnapshots(forgottenSnapshots) -deleted + /** + * Perform cleaning of old snapshots and log segments based on size. + * + * If our configured retention size has been violated, we perform cleaning as follows: + * + * Find oldest snapshot and delete it + * Advance log start offset to end of next oldest snapshot + * Delete log segments which wholly precede the new log start offset + * + * This process is repeated until the retention size is no longer violated, or until only + * a single snapshot remains. + */ + override def maybeClean(): Boolean = { +snapshots synchronized { + cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs() +} + } + + /** + * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since + * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in + * all cases. + * + * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted. + */ + private def cleanSnapshots(predicate: (OffsetAndEpoch, OffsetAndEpoch) => Boolean): Boolean = { +if (snapshots.size < 2) + return false; + +var didClean = false +snapshots.keys.toSeq.sliding(2).toSeq.takeWhile { + case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) => +if (predicate(snapshot, nextSnapshot) && deleteSnapshot(snapshot, nextSnapshot)) { + didClean = true + true +} else { + false +} + case _ => false // Shouldn't get here with sliding(2) +} +didClean + } Review comment: @jsancio how does this look? The `sliding(2)` will c
[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
mumrah commented on a change in pull request #10864: URL: https://github.com/apache/kafka/pull/10864#discussion_r660879928 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -358,6 +485,7 @@ final class KafkaMetadataLog private ( expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] ): Unit = { expiredSnapshots.foreach { case (snapshotId, _) => + info(s"Marking snapshot $snapshotId for deletion") Review comment: I went ahead and added a logIdent to this class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kkonstantine commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660870565 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -1063,6 +1076,132 @@ public int generation() { return generation; } +@Override +public void restartConnectorAndTasks( +RestartRequest request, +Callback callback +) { +final String connectorName = request.connectorName(); +addRequest( +() -> { +if (checkRebalanceNeeded(callback)) { +return null; +} +if (!configState.connectors().contains(request.connectorName())) { +callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); +return null; +} +if (isLeader()) { +// Write a restart request to the config backing store, to be executed asynchronously in tick() +configBackingStore.putRestartRequest(request); +// Compute and send the response that this was accepted +Optional maybePlan = buildRestartPlan(request); Review comment: We might be too precise here. As we discussed elsewhere, baking the meaning of the type in every variable name might be too verbose. It's a plan, and the fact that is optional means that there might be one or it might not. That's how I'd read it. The other use of `maybePlan` below is harder to avoid. Feel free to keep it consistent here with what you have below. Again, my point is not to bake type meaning in the variable names. I feel this keeps things simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12992) Make kraft configuration properties public
[ https://issues.apache.org/jira/browse/KAFKA-12992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-12992: --- Labels: kip-500 (was: ) > Make kraft configuration properties public > -- > > Key: KAFKA-12992 > URL: https://issues.apache.org/jira/browse/KAFKA-12992 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Jose Armando Garcia Sancio >Assignee: HaiyuanZhao >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0 > > > All of the Kraft configurations should be made public: > {code:java} > /* > * KRaft mode configs. Note that these configs are defined as > internal. We will make them public in the 3.0.0 release. > */ > .defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), > ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc) > .defineInternal(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, > NodeIdDoc) > .defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT, > Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, > InitialBrokerRegistrationTimeoutMsDoc) > .defineInternal(BrokerHeartbeatIntervalMsProp, INT, > Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, > BrokerHeartbeatIntervalMsDoc) > .defineInternal(BrokerSessionTimeoutMsProp, INT, > Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc) > .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, > MetadataLogDirDoc) > .defineInternal(ControllerListenerNamesProp, STRING, null, null, > HIGH, ControllerListenerNamesDoc) > .defineInternal(SaslMechanismControllerProtocolProp, STRING, > SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, > SaslMechanismControllerProtocolDoc) > {code} > > https://github.com/apache/kafka/blob/2beaf9a720330615bc5474ec079f8b4b105eff91/core/src/main/scala/kafka/server/KafkaConfig.scala#L1043-L1053 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kkonstantine commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660868474 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -761,6 +803,44 @@ public void onCompletion(Throwable error, ConsumerRecord record) } +@SuppressWarnings("unchecked") +RestartRequest recordToRestartRequest(ConsumerRecord record, SchemaAndValue value) { +String connectorName = record.key().substring(RESTART_PREFIX.length()); +if (value.value() == null) { +log.error("Ignoring restart request because it is unexpectedly null"); +return null; +} +if (!(value.value() instanceof Map)) { +log.error("Ignoring restart request because the value is not a Map but is {}", value.value().getClass()); +return null; +} + +Map valueAsMap = (Map) value.value(); + +Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME); +if (failed == null) { +log.warn("Invalid data for restart request '{}' field was missing, defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT); Review comment: Didn't notice. Thanks. Makes sense then -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kkonstantine commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660867874 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ## @@ -210,6 +211,15 @@ public void onDeletion(ConnectorTaskId id) { statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation())); } +public void onRestart(String connector) { Review comment: Did you compare it with other methods? My understanding is that `onDelete` is the same in that respect and exists on the `Listener` as well. But I might have skimmed too quickly through the code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660866556 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -208,6 +209,21 @@ public static String COMMIT_TASKS_KEY(String connectorName) { .field("creation-timestamp", Schema.INT64_SCHEMA) .build(); +public static final String RESTART_PREFIX = "restart-connector-"; + +public static String RESTART_KEY(String connectorName) { +return RESTART_PREFIX + connectorName; +} + +public static final Boolean ONLY_FAILED_DEFAULT = Boolean.FALSE; +public static final Boolean INCLUDE_TASKS_DEFAULT = Boolean.FALSE; Review comment: fixed as per your suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on pull request #10822: URL: https://github.com/apache/kafka/pull/10822#issuecomment-870820269 @kkonstantine Thanks a lot for taking the time and posting the detailed review comments. I was able to resolve all of them except for a few where I left some additional comments. Please let me know how you want to proceed with them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660864346 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ## @@ -210,6 +211,15 @@ public void onDeletion(ConnectorTaskId id) { statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation())); } +public void onRestart(String connector) { Review comment: I added the missing documentation. I didn't add the onRestart method to Listener because it didn't felt it should be a part of the task lifecycle controlled by the worker. Earlier the method was called recordRestart but based on a review comment, it was renamed it to onRestart, Do you think we should rename it back to recordRestart. The original intent of this method was to just record the state change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix
guozhangwang commented on a change in pull request #10917: URL: https://github.com/apache/kafka/pull/10917#discussion_r660844998 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -186,12 +190,17 @@ public void process(final K key, final V1 value) { @SuppressWarnings("unchecked") private void emitNonJoinedOuterRecords(final WindowStore, LeftOrRightValue> store) { +if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) { Review comment: If we want to treat it as a performance/emit latency trade-off, then maybe better not piggy-back on `beforeMs` but certain value that's irrelevant to the window size? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
mumrah commented on a change in pull request #10864: URL: https://github.com/apache/kafka/pull/10864#discussion_r660839303 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -16,32 +16,37 @@ */ package kafka.raft -import java.io.File -import java.nio.file.{Files, NoSuchFileException, Path} -import java.util.{Optional, Properties} import kafka.api.ApiVersion import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated} -import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal} +import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal} import kafka.utils.{CoreUtils, Logging, Scheduler} +import org.apache.kafka.common.config.AbstractConfig import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch} import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} +import java.io.File +import java.nio.file.{Files, NoSuchFileException, Path} +import java.util.{Optional, Properties} import scala.annotation.nowarn import scala.collection.mutable import scala.compat.java8.OptionConverters._ final class KafkaMetadataLog private ( - log: Log, + val log: Log, + time: Time, scheduler: Scheduler, // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. - snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], + val snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], Review comment: Fair enough. I need a way to get the number of snapshots from the test code. Maybe just a package private accessor like ```scala private[raft] def snapshotCount(): Int = { ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
mumrah commented on a change in pull request #10864: URL: https://github.com/apache/kafka/pull/10864#discussion_r660837401 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -358,6 +485,7 @@ final class KafkaMetadataLog private ( expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] ): Unit = { expiredSnapshots.foreach { case (snapshotId, _) => + info(s"Marking snapshot $snapshotId for deletion") Review comment: I assumed the class included some common log info like that, but looks like it doesn't. I'll add it here and open a separate PR to add some basic fields to the logIdent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
mumrah commented on a change in pull request #10864: URL: https://github.com/apache/kafka/pull/10864#discussion_r660836532 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -312,26 +313,152 @@ final class KafkaMetadataLog private ( } } - override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { -val (deleted, forgottenSnapshots) = snapshots synchronized { - latestSnapshotId().asScala match { -case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && - startOffset < logStartSnapshotId.offset && - logStartSnapshotId.offset <= snapshotId.offset && - log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + /** + * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the + * following invariants all hold true: + * + * This is not the latest snapshot (i.e., another snapshot proceeds this one) + * The offset of the next snapshot is greater than the log start offset + * The log can be advanced to the offset of the next snapshot + * + * This method is not thread safe and assumes a lock on the snapshots collection is held + */ + private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = { +nextSnapshotIdOpt.exists { nextSnapshotId => + if (snapshots.contains(snapshotId) && + snapshots.contains(nextSnapshotId) && + startOffset < nextSnapshotId.offset && + snapshotId.offset < nextSnapshotId.offset && + log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) { +log.deleteOldSegments() +val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]] +snapshots.remove(snapshotId) match { + case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot) + case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.") +} +removeSnapshots(forgotten) +true + } else { +false + } +} + } - // Delete all segments that have a "last offset" less than the log start offset - log.deleteOldSegments() + /** + * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe + */ + private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = { +snapshots.keys.toSeq.flatMap { + snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())} +} + } - // Forget snapshots less than the log start offset - (true, forgetSnapshotsBefore(logStartSnapshotId)) -case _ => - (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + /** + * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records + */ + private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = { +readSnapshot(snapshotId).asScala.flatMap { reader => + val it = reader.records().batchIterator() + if (it.hasNext) { +Some(it.next.maxTimestamp()) + } else { +None } } + } -removeSnapshots(forgottenSnapshots) -deleted + /** + * Perform cleaning of old snapshots and log segments based on size. + * + * If our configured retention size has been violated, we perform cleaning as follows: + * + * Find oldest snapshot and delete it + * Advance log start offset to end of next oldest snapshot + * Delete log segments which wholly precede the new log start offset + * + * This process is repeated until the retention size is no longer violated, or until only + * a single snapshot remains. + */ + override def maybeClean(): Boolean = { +snapshots synchronized { + cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs() +} + } + + /** + * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since + * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in + * all cases. + * + * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted. + */ + private def cleanSnapshots(predicate: (OffsetAndEpoch, Option[OffsetAndEpoch]) => Boolean): Boolean = { +val snapshotIterator = snapshots.keys.iterator +var snapshotOpt = Log.nextOption(snapshotIterator) +var didClean = false +while (snapshotOpt.isDefined) { + val snapshot = snapshotOpt.get + val nextOpt = Log.nextOption(snapshotIterator) + if (predicate(snapshot, nextOpt)) { +if (deleteSnapshot(snapshot, nextOpt)) { + didClean = true + snapshotOpt = nextOpt +} else { +
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660836387 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -761,6 +803,44 @@ public void onCompletion(Throwable error, ConsumerRecord record) } +@SuppressWarnings("unchecked") +RestartRequest recordToRestartRequest(ConsumerRecord record, SchemaAndValue value) { +String connectorName = record.key().substring(RESTART_PREFIX.length()); +if (value.value() == null) { +log.error("Ignoring restart request because it is unexpectedly null"); +return null; +} +if (!(value.value() instanceof Map)) { +log.error("Ignoring restart request because the value is not a Map but is {}", value.value().getClass()); +return null; +} + +Map valueAsMap = (Map) value.value(); + +Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME); +if (failed == null) { +log.warn("Invalid data for restart request '{}' field was missing, defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT); Review comment: we are logging an error if we are rejecting the request due to an invalid type, but in this particular case as we are defaulting the missing fields, that's why the warn. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
mumrah commented on a change in pull request #10864: URL: https://github.com/apache/kafka/pull/10864#discussion_r660836052 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -312,26 +313,152 @@ final class KafkaMetadataLog private ( } } - override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { -val (deleted, forgottenSnapshots) = snapshots synchronized { - latestSnapshotId().asScala match { -case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && - startOffset < logStartSnapshotId.offset && - logStartSnapshotId.offset <= snapshotId.offset && - log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + /** + * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the + * following invariants all hold true: + * + * This is not the latest snapshot (i.e., another snapshot proceeds this one) + * The offset of the next snapshot is greater than the log start offset + * The log can be advanced to the offset of the next snapshot + * + * This method is not thread safe and assumes a lock on the snapshots collection is held + */ + private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = { +nextSnapshotIdOpt.exists { nextSnapshotId => + if (snapshots.contains(snapshotId) && + snapshots.contains(nextSnapshotId) && + startOffset < nextSnapshotId.offset && + snapshotId.offset < nextSnapshotId.offset && + log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) { +log.deleteOldSegments() +val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]] +snapshots.remove(snapshotId) match { + case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot) + case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.") +} +removeSnapshots(forgotten) +true + } else { +false + } +} + } - // Delete all segments that have a "last offset" less than the log start offset - log.deleteOldSegments() + /** + * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe + */ + private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = { +snapshots.keys.toSeq.flatMap { + snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())} +} + } - // Forget snapshots less than the log start offset - (true, forgetSnapshotsBefore(logStartSnapshotId)) -case _ => - (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + /** + * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records + */ + private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = { +readSnapshot(snapshotId).asScala.flatMap { reader => + val it = reader.records().batchIterator() + if (it.hasNext) { +Some(it.next.maxTimestamp()) + } else { +None } } + } -removeSnapshots(forgottenSnapshots) -deleted + /** + * Perform cleaning of old snapshots and log segments based on size. + * + * If our configured retention size has been violated, we perform cleaning as follows: + * + * Find oldest snapshot and delete it + * Advance log start offset to end of next oldest snapshot + * Delete log segments which wholly precede the new log start offset + * + * This process is repeated until the retention size is no longer violated, or until only + * a single snapshot remains. + */ + override def maybeClean(): Boolean = { +snapshots synchronized { + cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs() +} + } + + /** + * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since + * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in + * all cases. + * + * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted. + */ + private def cleanSnapshots(predicate: (OffsetAndEpoch, Option[OffsetAndEpoch]) => Boolean): Boolean = { +val snapshotIterator = snapshots.keys.iterator +var snapshotOpt = Log.nextOption(snapshotIterator) +var didClean = false +while (snapshotOpt.isDefined) { + val snapshot = snapshotOpt.get + val nextOpt = Log.nextOption(snapshotIterator) + if (predicate(snapshot, nextOpt)) { +if (deleteSnapshot(snapshot, nextOpt)) { Review comment: That's a good point, I'll clean this up a bit -- This is an
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660834568 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -1063,6 +1076,132 @@ public int generation() { return generation; } +@Override +public void restartConnectorAndTasks( +RestartRequest request, +Callback callback +) { +final String connectorName = request.connectorName(); +addRequest( +() -> { +if (checkRebalanceNeeded(callback)) { +return null; +} +if (!configState.connectors().contains(request.connectorName())) { +callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); +return null; +} +if (isLeader()) { +// Write a restart request to the config backing store, to be executed asynchronously in tick() +configBackingStore.putRestartRequest(request); +// Compute and send the response that this was accepted +Optional maybePlan = buildRestartPlan(request); Review comment: The plan may or may not be present, that's why I was calling it maybePlan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
jsancio commented on a change in pull request #10864: URL: https://github.com/apache/kafka/pull/10864#discussion_r660808793 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -16,32 +16,37 @@ */ package kafka.raft -import java.io.File -import java.nio.file.{Files, NoSuchFileException, Path} -import java.util.{Optional, Properties} import kafka.api.ApiVersion import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated} -import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal} +import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal} import kafka.utils.{CoreUtils, Logging, Scheduler} +import org.apache.kafka.common.config.AbstractConfig import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch} import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} +import java.io.File +import java.nio.file.{Files, NoSuchFileException, Path} +import java.util.{Optional, Properties} import scala.annotation.nowarn import scala.collection.mutable import scala.compat.java8.OptionConverters._ final class KafkaMetadataLog private ( - log: Log, + val log: Log, + time: Time, scheduler: Scheduler, // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. - snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], + val snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], Review comment: I don't think we should do this. Access to this data needs to be synchronized. It is also very likely that this is an implementation detail that will change in the near future. ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ## @@ -244,10 +246,10 @@ public void testFetchRequestOffsetLessThanLogStart() throws Exception { assertEquals(snapshotId, snapshot.snapshotId()); snapshot.freeze(); } - +context.log.deleteBeforeSnapshot(snapshotId); context.client.poll(); -assertEquals(snapshotId.offset, context.log.startOffset()); +//context.log.logStartOffset(snapshotId.offset); Review comment: Commented out code. ## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ## @@ -316,6 +316,11 @@ public void flush() { lastFlushedOffset = endOffset().offset; } +@Override +public boolean maybeClean() { +return false; +} Review comment: Are you planning to implement this and use this instead of `deleteBeforeSnapshot` below? Most of the tests in the `raft` module depend on `MockLog` having the same semantic as `KafkaMetadataLog`. ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -312,26 +313,152 @@ final class KafkaMetadataLog private ( } } - override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { -val (deleted, forgottenSnapshots) = snapshots synchronized { - latestSnapshotId().asScala match { -case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && - startOffset < logStartSnapshotId.offset && - logStartSnapshotId.offset <= snapshotId.offset && - log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + /** + * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the + * following invariants all hold true: + * + * This is not the latest snapshot (i.e., another snapshot proceeds this one) + * The offset of the next snapshot is greater than the log start offset + * The log can be advanced to the offset of the next snapshot + * + * This method is not thread safe and assumes a lock on the snapshots collection is held + */ + private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = { +nextSnapshotIdOpt.exists { nextSnapshotId => + if (snapshots.contains(snapshotId) && + snapshots.contains(nextSnapshotId) && + startOffset < nextSnapshotId.offset && + snapshotId.offset < nextSnapshotId.offset && + log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) { +log.deleteOldSegments() +val forgotten = mutable.Tree
[GitHub] [kafka] jolshan commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
jolshan commented on pull request #9769: URL: https://github.com/apache/kafka/pull/9769#issuecomment-870786796 Hi again @dengziming. There was one more change to the API (should be the last one this time). It includes adding a new class that I implement in this PR: https://github.com/apache/kafka/pull/10892 Please check out the KIP for more details on the changes to DescribeTopicsResult and the new apis for describing with topic IDs. Thanks for your diligence and patience with this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #10930: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs
rajinisivaram commented on pull request #10930: URL: https://github.com/apache/kafka/pull/10930#issuecomment-870776438 Thanks @guozhangwang ! Yes, we should cherry-pick to 2.8 and 2.7. I can do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on pull request #10940: KAFKA-13007: KafkaAdminClient getListOffsetsCalls reuse cluster snapshot
jeffkbkim commented on pull request #10940: URL: https://github.com/apache/kafka/pull/10940#issuecomment-870768034 @ijuma @hachikuji thanks for the review. i'll add a benchmark, not too sure if there's any other way to test the performance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10940: KAFKA-13007: KafkaAdminClient getListOffsetsCalls reuse cluster snapshot
ijuma commented on a change in pull request #10940: URL: https://github.com/apache/kafka/pull/10940#discussion_r660800092 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4200,6 +4200,7 @@ public ListOffsetsResult listOffsets(Map topicPartit Map> futures) { MetadataResponse mr = context.response().orElseThrow(() -> new IllegalStateException("No Metadata response")); +Cluster clusterSnapshot = mr.cluster(); Review comment: yeah, I was thinking the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked t…
jsancio commented on a change in pull request #10932: URL: https://github.com/apache/kafka/pull/10932#discussion_r660788266 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -1014,6 +1015,26 @@ public void verify() { } } +private static class LeaderNeverLoadSnapshot implements Invariant { +final Cluster cluster; + +private LeaderNeverLoadSnapshot(Cluster cluster) { +this.cluster = cluster; +} + +@Override +public void verify() { +for (RaftNode raftNode : cluster.running()) { +if (raftNode.counter.isWritable()) { +assertTrue(raftNode.counter.getHandleSnapshotCalls() == 0); Review comment: Let's use `assertEquals`: ```suggestion assertEquals(0, raftNode.counter.getHandleSnapshotCalls()); ``` ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -157,13 +160,18 @@ public synchronized void handleSnapshot(SnapshotReader reader) { public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) { if (newLeader.isLeader(nodeId)) { log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", -committed, newLeader); +committed, newLeader); uncommitted = committed; claimedEpoch = OptionalInt.of(newLeader.epoch()); } else { log.debug("Counter uncommitted value reset after resigning leadership"); uncommitted = -1; claimedEpoch = OptionalInt.empty(); } +handleSnapshotCalls = 0; +} + +public int getHandleSnapshotCalls() { Review comment: The Kafka project tends not to prefix accessors with `get`: ```suggestion public int handleSnapshotCalls() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10940: KAFKA-13007: KafkaAdminClient getListOffsetsCalls reuse cluster snapshot
hachikuji commented on a change in pull request #10940: URL: https://github.com/apache/kafka/pull/10940#discussion_r660787940 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4200,6 +4200,7 @@ public ListOffsetsResult listOffsets(Map topicPartit Map> futures) { MetadataResponse mr = context.response().orElseThrow(() -> new IllegalStateException("No Metadata response")); +Cluster clusterSnapshot = mr.cluster(); Review comment: Having names that sound like simple accessors probably makes this kind of performance issue more likely. Maybe we could rename `cluster()` to `buildCluster()` so that it is clear there is some work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot
cmccabe merged pull request #10899: URL: https://github.com/apache/kafka/pull/10899 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12677) The raftCluster always send to the wrong active controller and never update
[ https://issues.apache.org/jira/browse/KAFKA-12677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-12677: --- Labels: kip-500 (was: ) > The raftCluster always send to the wrong active controller and never update > --- > > Key: KAFKA-12677 > URL: https://issues.apache.org/jira/browse/KAFKA-12677 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0 > > > We introduce KIP-500 to introduce a Self-Managed Metadata Quorum. We should > always have 1 active controller, and all the RPC will send to the active > controller. But there's chances that the active controller already changed, > but the RPC still send to the old one. > In the attachment log, we can see: > {code:java} > [Controller 3002] Becoming active at controller epoch 1. > ... > [Controller 3000] Becoming active at controller epoch 2. > {code} > So, the latest active controller should be 3000. But the create topic RPC are > all sending to controller 3002: > {code:java} > "errorMessage":"The active controller appears to be node 3000" > {code} > This bug causes the RaftClusterTests flaky. > > Debug log while running testCreateClusterAndCreateListDeleteTopic test: > https://drive.google.com/file/d/1WVUgy1Erjx8mHyofiP9MVvQGb0LcDYt3/view?usp=sharing -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660782579 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -761,6 +803,44 @@ public void onCompletion(Throwable error, ConsumerRecord record) } +@SuppressWarnings("unchecked") +RestartRequest recordToRestartRequest(ConsumerRecord record, SchemaAndValue value) { +String connectorName = record.key().substring(RESTART_PREFIX.length()); +if (value.value() == null) { +log.error("Ignoring restart request because it is unexpectedly null"); +return null; +} +if (!(value.value() instanceof Map)) { +log.error("Ignoring restart request because the value is not a Map but is {}", value.value().getClass()); +return null; +} + +Map valueAsMap = (Map) value.value(); + +Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME); +if (failed == null) { +log.warn("Invalid data for restart request '{}' field was missing, defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT); +failed = ONLY_FAILED_DEFAULT; +} +if (!(failed instanceof Boolean)) { +log.warn("Invalid data for restart request '{}' field should be a Boolean but is {}, defaulting to {}", ONLY_FAILED_FIELD_NAME, failed.getClass(), ONLY_FAILED_DEFAULT); +failed = ONLY_FAILED_DEFAULT; +} +boolean onlyFailed = (Boolean) failed; Review comment: I Agree and I fixed it like you suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660781818 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ## @@ -357,6 +367,62 @@ public void validateConnectorConfig(Map connectorProps, Callback }); } +/** + * Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request + * and the current status of the connector and task instances. + * + * @param request the restart request; may not be null + * @return the restart plan, or empty if this worker has no status for the connector named in the request and therefore the + * connector cannot be restarted + */ +public Optional buildRestartPlan(RestartRequest request) { +String connectorName = request.connectorName(); +ConnectorStatus connectorStatus = statusBackingStore.get(connectorName); +if (connectorStatus == null) { +return Optional.empty(); +} + +// If requested, mark the connector as restarting +AbstractStatus.State connectorState; +if (request.shouldRestartConnector(connectorStatus)) { +connectorState = AbstractStatus.State.RESTARTING; +} else { +connectorState = connectorStatus.state(); +} Review comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12992) Make kraft configuration properties public
[ https://issues.apache.org/jira/browse/KAFKA-12992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao reassigned KAFKA-12992: --- Assignee: HaiyuanZhao > Make kraft configuration properties public > -- > > Key: KAFKA-12992 > URL: https://issues.apache.org/jira/browse/KAFKA-12992 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Jose Armando Garcia Sancio >Assignee: HaiyuanZhao >Priority: Blocker > Fix For: 3.0.0 > > > All of the Kraft configurations should be made public: > {code:java} > /* > * KRaft mode configs. Note that these configs are defined as > internal. We will make them public in the 3.0.0 release. > */ > .defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), > ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc) > .defineInternal(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, > NodeIdDoc) > .defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT, > Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, > InitialBrokerRegistrationTimeoutMsDoc) > .defineInternal(BrokerHeartbeatIntervalMsProp, INT, > Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, > BrokerHeartbeatIntervalMsDoc) > .defineInternal(BrokerSessionTimeoutMsProp, INT, > Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc) > .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, > MetadataLogDirDoc) > .defineInternal(ControllerListenerNamesProp, STRING, null, null, > HIGH, ControllerListenerNamesDoc) > .defineInternal(SaslMechanismControllerProtocolProp, STRING, > SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, > SaslMechanismControllerProtocolDoc) > {code} > > https://github.com/apache/kafka/blob/2beaf9a720330615bc5474ec079f8b4b105eff91/core/src/main/scala/kafka/server/KafkaConfig.scala#L1043-L1053 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeffkbkim opened a new pull request #10940: KAFKA-13007: KafkaAdminClient getListOffsetsCalls reuse cluster snapshot
jeffkbkim opened a new pull request #10940: URL: https://github.com/apache/kafka/pull/10940 in getListOffsetsCalls, we rebuild the cluster snapshot for every topic partition. instead, we should reuse a snapshot. this will reduce the time complexity from O(n^2) to O(n). for manual testing (used AK 2.8), i've passed in a map of 6K topic partitions to listOffsets without snapshot reuse: duration of building futures from metadata response: **15582** milliseconds total duration of listOffsets: **15743** milliseconds with reuse: duration of building futures from metadata response: **24** milliseconds total duration of listOffsets: **235** milliseconds ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked t…
zhaohaidao commented on pull request #10932: URL: https://github.com/apache/kafka/pull/10932#issuecomment-870739108 @jsancio Thanks for your advice. The comments have been addressed. Please continue to review when you have time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config
rondagostino commented on pull request #10811: URL: https://github.com/apache/kafka/pull/10811#issuecomment-870738717 Regarding the bootstrap use cases, these were enumerated in [KIP-515: Enable ZK client to use the new TLS supported authentication](https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Enable+ZK+client+to+use+the+new+TLS+supported+authentication) when we added ZooKeeper TLS, and while that was a while ago, yes, it is still necessary to be able to bootstrap configs prior to starting Kafka. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12520) Producer state is needlessly rebuilt on startup
[ https://issues.apache.org/jira/browse/KAFKA-12520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-12520. - Fix Version/s: 3.0.0 Assignee: Cong Ding (was: Dhruvil Shah) Resolution: Fixed merged the PR to trunk > Producer state is needlessly rebuilt on startup > --- > > Key: KAFKA-12520 > URL: https://issues.apache.org/jira/browse/KAFKA-12520 > Project: Kafka > Issue Type: Bug >Reporter: Dhruvil Shah >Assignee: Cong Ding >Priority: Major > Fix For: 3.0.0 > > > When we find a {{.swap}} file on startup, we typically want to rename and > replace it as {{.log}}, {{.index}}, {{.timeindex}}, etc. as a way to complete > any ongoing replace operations. These swap files are usually known to have > been flushed to disk before the replace operation begins. > One flaw in the current logic is that when we recover these swap files on > startup, we end up truncating the producer state and rebuild it from scratch. > This is unneeded as the replace operation does not mutate the producer state > by itself. It is only meant to replace the {{.log}} file along with > corresponding indices. > Because of this unneeded producer state rebuild operation, we have seen > multi-hour startup times for clusters that have large compacted topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao merged pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
junrao merged pull request #10763: URL: https://github.com/apache/kafka/pull/10763 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked t…
zhaohaidao commented on a change in pull request #10932: URL: https://github.com/apache/kafka/pull/10932#discussion_r660772538 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -38,6 +38,9 @@ private OptionalInt claimedEpoch = OptionalInt.empty(); private long lastOffsetSnapshotted = -1; +private int handleSnapshotCalls = 0; +private boolean handleSnapshotCalled = false; Review comment: fair enough -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked t…
zhaohaidao commented on a change in pull request #10932: URL: https://github.com/apache/kafka/pull/10932#discussion_r660772174 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -157,13 +162,27 @@ public synchronized void handleSnapshot(SnapshotReader reader) { public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) { if (newLeader.isLeader(nodeId)) { log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", -committed, newLeader); +committed, newLeader); uncommitted = committed; claimedEpoch = OptionalInt.of(newLeader.epoch()); } else { log.debug("Counter uncommitted value reset after resigning leadership"); uncommitted = -1; claimedEpoch = OptionalInt.empty(); } +handleSnapshotCalled = false; +handleSnapshotCalls = 0; +} + +public boolean isLeader() { +return this.client.leaderAndEpoch().isLeader(nodeId); Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy closed pull request #10866: [Test ] debug zk acl errors
omkreddy closed pull request #10866: URL: https://github.com/apache/kafka/pull/10866 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required
junrao commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r660769175 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -192,20 +230,10 @@ object LogLoader extends Logging { debug(s"${params.logIdentifier}Deleting stray temporary file ${file.getAbsolutePath}") Files.deleteIfExists(file.toPath) } else if (filename.endsWith(CleanedFileSuffix)) { -minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) -cleanFiles += file +minCleanedFileOffset = Math.min(offsetFromFile(file), minCleanedFileOffset) +cleanedFiles += file } else if (filename.endsWith(SwapFileSuffix)) { -// we crashed in the middle of a swap operation, to recover: -// if a log, delete the index files, complete the swap operation later -// if an index just delete the index files, they will be rebuilt -val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) -info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from interrupted swap operation.") -if (Log.isIndexFile(baseFile)) { - deleteIndicesIfExist(baseFile) -} else if (Log.isLogFile(baseFile)) { - deleteIndicesIfExist(baseFile) - swapFiles += file -} +swapFiles += file Review comment: Thanks for the explanation. Make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10930: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs
guozhangwang commented on pull request #10930: URL: https://github.com/apache/kafka/pull/10930#issuecomment-870715774 Merged to trunk. Thanks @rajinisivaram ! Should we cherry-pick to older branches too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #10930: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs
guozhangwang merged pull request #10930: URL: https://github.com/apache/kafka/pull/10930 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)
cmccabe commented on a change in pull request #10931: URL: https://github.com/apache/kafka/pull/10931#discussion_r660743774 ## File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.RemoveTopicRecord; +import org.apache.kafka.common.metadata.TopicRecord; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + + +/** + * Represents changes to the topics in the metadata image. + */ +public final class TopicsDelta { +private final TopicsImage image; + +/** + * A map from topic IDs to the topic deltas for each topic. Topics which have been + * deleted will not appear in this map. + */ +private final Map changedTopics = new HashMap<>(); + +/** + * The IDs of topics that exist in the image but that have been deleted. Note that if + * a topic does not exist in the image, it will also not exist in this set. Topics + * that are created and then deleted within the same delta will leave no trace. + */ +private final Set deletedTopicIds = new HashSet<>(); Review comment: No worries. Thanks for taking a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.
IgnacioAcunaF commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r660172772 ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +63,88 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicPartition("testTopic1", 0); +val testTopicPartition1 = new TopicPartition("testTopic1", 1); +val testTopicPartition2 = new TopicPartition("testTopic1", 2); +val testTopicPartition3 = new TopicPartition("testTopic2", 0); +val testTopicPartition4 = new TopicPartition("testTopic2", 1); +val testTopicPartition5 = new TopicPartition("testTopic2", 2); + +// Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined +val commitedOffsets = Map( + testTopicPartition1 -> new OffsetAndMetadata(100), + testTopicPartition2 -> null, + testTopicPartition3 -> new OffsetAndMetadata(100), + testTopicPartition4 -> new OffsetAndMetadata(100), + testTopicPartition5 -> null, +).asJava + +val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) +val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), +) +val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2) +val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5) + +val consumerGroupDescription = new ConsumerGroupDescription(group, + true, + Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))), + classOf[RangeAssignor].getName, + ConsumerGroupState.STABLE, + new Node(1, "localhost", 9092)) + +def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = { + topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions) +} + + when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription +when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) + .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets)) +when(admin.listOffsets( + ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)), + any() +)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava)) +when(admin.listOffsets( + ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)), + any() +)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.asJava)) + +val (state, assignments) = groupService.collectGroupOffsets(group) +val returnedOffsets = assignments.map { results => + results.map { assignment => +new TopicPartition(assignment.topic.get, assignment.partition.get) -> assignment.offset + }.toMap +}.getOrElse(Map.empty) +// Results should have information for all assigned topic partition (even if there is not Offset's information at all, because they get fills with None) +// Results should have information only for unassigned topic partitions if and only if there is information about them (including with null values) Review comment: You are right, it's not relevant because all partitions have information now. Going to remove ## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ## @@ -62,6 +63,88 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { +val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") +val groupService = consumerGroupService(args) + +val testTopicPartition0 = new TopicP