[GitHub] [kafka] chernyih commented on pull request #12131: KAFKA-13879: Reconnect exponential backoff is ineffective in some cases
chernyih commented on PR #12131: URL: https://github.com/apache/kafka/pull/12131#issuecomment-1123211008 Retry backoff doesn't help. `KafkaAdminClient` fetches metadata from bootstrap server, let's say with a call timeout of 5s. Within 5s, `NetworkClient` can connect to the bootstrap server multiple times based on reconnect backoff. After 5s, fetch metadata call times out because the bootstrap server node is not ready. Since the last attempt of fetch metadata request is 5 seconds ago (which should be more than retry backoff), `KafkaAdminClient` will retry fetching metadata again. Another case is if there are clients that use `NetworkClient` and don't implement retry backoff, then retry backoff has no effect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12097: MINOR: Make TopicPartitionBookkeeper and TopicPartitionEntry top level
ijuma commented on code in PR #12097: URL: https://github.com/apache/kafka/pull/12097#discussion_r869841007 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import java.util.HashMap; +import java.util.Map; +import java.util.OptionalInt; +import java.util.OptionalLong; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.ProduceResponse; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.ProducerIdAndEpoch; + +class TxnPartitionMap { + +final Map topicPartitions = new HashMap<>(); + +TxnPartitionEntry get(TopicPartition topicPartition) { Review Comment: I removed the `Partition` suffix from this and `getOrCreate`. ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionBookkeeper.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import java.util.HashMap; +import java.util.Map; +import java.util.OptionalInt; +import java.util.OptionalLong; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.ProduceResponse; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.ProducerIdAndEpoch; + +class TxnPartitionBookkeeper { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12144: MINOR: reload4j build dependency fixes
ijuma commented on PR #12144: URL: https://github.com/apache/kafka/pull/12144#issuecomment-1123142383 Merged to trunk and cherry-picked to 3.2 and 3.1 branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12146: MINOR: Remove kraft authorizer from list of missing features
ijuma commented on code in PR #12146: URL: https://github.com/apache/kafka/pull/12146#discussion_r869838507 ## config/kraft/README.md: ## @@ -114,9 +114,7 @@ We don't support any kind of upgrade right now, either to or from KRaft mode. T Finally, the following Kafka features have not yet been fully implemented: -* Support for certain security features: configuring a KRaft-based Authorizer, setting up SCRAM, delegation tokens, and so forth - (although note that you can use authorizers such as `kafka.security.authorizer.AclAuthorizer` with KRaft clusters, even - if they are ZooKeeper-based: simply define `authorizer.class.name` and configure the authorizer as you normally would). +* Support for certain security features: setting up SCRAM, delegation tokens, and so forth. * Support for some configurations, like enabling unclean leader election by default or dynamically changing broker endpoints Review Comment: @jsancio did you fix the unclean leader election thing with your KIP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions
showuon commented on PR #12112: URL: https://github.com/apache/kafka/pull/12112#issuecomment-1123139779 @dengziming , thanks for the update! @divijvaidya , if you have no other comments, I'm going to merge this PR within this week. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12041: MINOR: ignore unused configuration when ConsumerCoordinator is not constructed
C0urante commented on PR #12041: URL: https://github.com/apache/kafka/pull/12041#issuecomment-1123137976 @guozhangwang to be clear, nobody is advocating that we call `ignore` on everything. I was proposing that we call `ignore` on everything that's already defined, which is pretty clear if you read the code example I gave. Does that clear things up? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12062: KAFKA-13833: Remove the min_version_level from the finalized version written to ZooKeeper
dengziming commented on code in PR #12062: URL: https://github.com/apache/kafka/pull/12062#discussion_r869835226 ## core/src/main/scala/kafka/server/ApiVersionManager.scala: ## @@ -86,15 +86,15 @@ class DefaultApiVersionManager( throttleTimeMs, interBrokerProtocolVersion.highestSupportedRecordVersion, supportedFeatures, -finalizedFeatures.features, +finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava, Review Comment: Yes, we should convert it manually since scala won't do it automatically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #12144: MINOR: reload4j build dependency fixes
ijuma merged PR #12144: URL: https://github.com/apache/kafka/pull/12144 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…
dengziming commented on PR #12104: URL: https://github.com/apache/kafka/pull/12104#issuecomment-1123116129 Hello @vamossagar12, sorry for changing around, we added a `TestUtils.ensureConsistentKRaftMetadata` in #12108 to fix this kind of flakiness, which waits until the brokers have caught up to the controller metadata topic end offset, instead of waiting for something really specific, you can just use this method for KRaft mode, and keep it was for zk mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions
dengziming commented on PR #12112: URL: https://github.com/apache/kafka/pull/12112#issuecomment-1123114260 Hello @showuon @divijvaidya, #12108 has been merged, we added `TestUtils.ensureConsistentKRaftMetadata` to fix all this type of flakiness, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12097: MINOR: Make TopicPartitionBookkeeper and TopicPartitionEntry top level
ijuma commented on code in PR #12097: URL: https://github.com/apache/kafka/pull/12097#discussion_r869795034 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionBookkeeper.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import java.util.HashMap; +import java.util.Map; +import java.util.OptionalInt; +import java.util.OptionalLong; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.ProduceResponse; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.ProducerIdAndEpoch; + +class TxnPartitionBookkeeper { Review Comment: Sounds good, will rename. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12097: MINOR: Make TopicPartitionBookkeeper and TopicPartitionEntry top level
hachikuji commented on code in PR #12097: URL: https://github.com/apache/kafka/pull/12097#discussion_r869776847 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionBookkeeper.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import java.util.HashMap; +import java.util.Map; +import java.util.OptionalInt; +import java.util.OptionalLong; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.ProduceResponse; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.ProducerIdAndEpoch; + +class TxnPartitionBookkeeper { Review Comment: I think basically we're providing a high level view into a map. How about `TxnPartitionMap`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading
cmccabe commented on code in PR #11969: URL: https://github.com/apache/kafka/pull/11969#discussion_r869766209 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend sensor } } + + /** + * Close `channel` and decrement the connection count. + */ + def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = { +if (channel != null) { + debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress}") + dec(listenerName, channel.socket.getInetAddress) + closeSocket(channel, this) Review Comment: Update: I have changed it so that `closeChannel` takes a `Logging` argument (which will be either the Acceptor or Processor). This should lead to the log message about closure being listened under either the Acceptor or Processor, as it was before this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-7527) Enable Dependency Injection for Kafka Streams handlers (KIP-378)
[ https://issues.apache.org/jira/browse/KAFKA-7527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-7527. Resolution: Fixed > Enable Dependency Injection for Kafka Streams handlers (KIP-378) > > > Key: KAFKA-7527 > URL: https://issues.apache.org/jira/browse/KAFKA-7527 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0, 2.1.0 >Reporter: Wladimir Schmidt >Priority: Minor > Labels: kip, usability > > Implement solution proposed in the KIP-378 (Enable Dependency Injection for > Kafka Streams handlers). > Link to > [KIP-378|https://cwiki.apache.org/confluence/display/KAFKA/KIP-378:+Enable+Dependency+Injection+for+Kafka+Streams+handlers] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] mjsax commented on a diff in pull request #12122: WIP: Upgrade tests for KAFKA-13769
mjsax commented on code in PR #12122: URL: https://github.com/apache/kafka/pull/12122#discussion_r869751965 ## streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java: ## @@ -63,21 +80,30 @@ public static void main(final String[] args) throws Exception { })); } -private static ProcessorSupplier printProcessorSupplier() { +private static void buildFKTable(final KStream primaryTable, +final KTable otherTable) { Review Comment: nit: indention ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -37,6 +37,7 @@ metadata_1_versions = [str(LATEST_0_10_0)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] +fk_join_versions = [str(V_2_4_0)] Review Comment: We only test with latest bug-fix releases, so we can use `LATEST_2_4` here. ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -37,6 +37,7 @@ metadata_1_versions = [str(LATEST_0_10_0)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] +fk_join_versions = [str(V_2_4_0)] Review Comment: Should we test all versions 2.4 - 3.1 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12131: KAFKA-13879: Reconnect exponential backoff is ineffective in some cases
ijuma commented on PR #12131: URL: https://github.com/apache/kafka/pull/12131#issuecomment-1122986117 Thanks for the PR. One question: did retry backoff help in this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
mjsax commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r869501138 ## streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java: ## @@ -19,15 +19,52 @@ import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy; import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; /** * This interface controls the strategy that can be used to control how we emit results in a processor. */ public interface EmitStrategy { +Logger log = LoggerFactory.getLogger(EmitStrategy.class); + enum StrategyType { -ON_WINDOW_CLOSE, -ON_WINDOW_UPDATE +ON_WINDOW_UPDATE(0, new WindowUpdateStrategy()), +ON_WINDOW_CLOSE(1, new WindowCloseStrategy()); + +private final short code; +private final EmitStrategy strategy; + +private short code() { +return this.code; +} + +private EmitStrategy strategy() { +return this.strategy; +} + +StrategyType(final int code, final EmitStrategy strategy) { +this.code = (short) code; +this.strategy = strategy; +} + +private final static Map TYPE_TO_STRATEGY = new HashMap<>(); + +static { +for (final StrategyType type : StrategyType.values()) { +if (TYPE_TO_STRATEGY.put(type.code(), type.strategy()) != null) +throw new IllegalStateException("Code " + type.code() + " for type " + Review Comment: Never seen anything like this before -- is it best practice to have a guard like this? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> { + +private final Time time = Time.SYSTEM; +private final String storeName; +private final EmitStrategy emitStrategy; +private final boolean sendOldValues; +protected final TimeTracker timeTracker = new TimeTracker(); + +private TimestampedTupleForwarder, VAgg> tupleForwarder; +protected TimestampedWindowStore windowStore; +protected Sensor
[jira] [Assigned] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once
[ https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jakub Miroś reassigned KAFKA-13817: --- Assignee: (was: Jakub Miroś) > Schedule nextTimeToEmit to system time every time instead of just once > -- > > Key: KAFKA-13817 > URL: https://issues.apache.org/jira/browse/KAFKA-13817 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Priority: Minor > Labels: beginner, newbie > > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.] > > If this is just scheduled once, this can trigger emit every time if system > time jumps a lot suddenly. > > For example, > # nextTimeToEmit set to 1 and step is 1 > # If next system time jumps to 100, we will always emit for next 100 records -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13892) Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager
[ https://issues.apache.org/jira/browse/KAFKA-13892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534600#comment-17534600 ] Andrew Grant commented on KAFKA-13892: -- Merged in https://github.com/apache/kafka/commit/040b11d70594e0499e96014e17a307366b640444 > Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager > > > Key: KAFKA-13892 > URL: https://issues.apache.org/jira/browse/KAFKA-13892 > Project: Kafka > Issue Type: Bug >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > > In > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143] > we loop through the ACL filters and and add RemoveAccessControlEntryRecord > records to the response list for each matching ACL. I think there's a bug > where if two filters match the same ACL, we create two > RemoveAccessControlEntryRecord records for that same ACL. This is an issue > because upon replay we throw an exception > (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195) > if the ACL is not in the in-memory data structures which will happen to the > second RemoveAccessControlEntryRecord. > Maybe we can just de-dupe both List and > List? I think something like (just showing code for > ApiMessageAndVersion): > {code:java} > private List > deDupeApiMessageAndVersion(List messages) { > return new HashSet<>(messages).stream().collect(Collectors.toList()); > }{code} > should suffice as I don't think the ordering matters within the list of > response objects. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13892) Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager
[ https://issues.apache.org/jira/browse/KAFKA-13892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant resolved KAFKA-13892. -- Resolution: Fixed > Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager > > > Key: KAFKA-13892 > URL: https://issues.apache.org/jira/browse/KAFKA-13892 > Project: Kafka > Issue Type: Bug >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > > In > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143] > we loop through the ACL filters and and add RemoveAccessControlEntryRecord > records to the response list for each matching ACL. I think there's a bug > where if two filters match the same ACL, we create two > RemoveAccessControlEntryRecord records for that same ACL. This is an issue > because upon replay we throw an exception > (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195) > if the ACL is not in the in-memory data structures which will happen to the > second RemoveAccessControlEntryRecord. > Maybe we can just de-dupe both List and > List? I think something like (just showing code for > ApiMessageAndVersion): > {code:java} > private List > deDupeApiMessageAndVersion(List messages) { > return new HashSet<>(messages).stream().collect(Collectors.toList()); > }{code} > should suffice as I don't think the ordering matters within the list of > response objects. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once
[ https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jakub Miroś reassigned KAFKA-13817: --- Assignee: Jakub Miroś > Schedule nextTimeToEmit to system time every time instead of just once > -- > > Key: KAFKA-13817 > URL: https://issues.apache.org/jira/browse/KAFKA-13817 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Jakub Miroś >Priority: Minor > Labels: beginner, newbie > > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.] > > If this is just scheduled once, this can trigger emit every time if system > time jumps a lot suddenly. > > For example, > # nextTimeToEmit set to 1 and step is 1 > # If next system time jumps to 100, we will always emit for next 100 records -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13879) Exponential backoff for reconnect does not work
[ https://issues.apache.org/jira/browse/KAFKA-13879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chern Yih Cheah resolved KAFKA-13879. - Resolution: Fixed > Exponential backoff for reconnect does not work > --- > > Key: KAFKA-13879 > URL: https://issues.apache.org/jira/browse/KAFKA-13879 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 2.7.0 >Reporter: Chern Yih Cheah >Assignee: Chern Yih Cheah >Priority: Minor > > When a client connects to a SSL listener using PLAINTEXT security protocol, > after the TCP connection is setup, the client considers the channel setup is > complete (in reality the channel setup is not complete yet). The client > issues API version request after that. When issuing API version request, > reconnection exponential backoff is reset. Since the broker expects SSL > handshake, client's API version request will cause the connection to > disconnect. Reconnect will happen without exponential backoff since it has > been reset. > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L249.] > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] cmccabe merged pull request #12145: KAFKA-13892: Dedupe RemoveAccessControlEntryRecord in deleteAcls response of AclControlManager
cmccabe merged PR #12145: URL: https://github.com/apache/kafka/pull/12145 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ahuang98 commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
ahuang98 commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r869715273 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -70,13 +47,17 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") + val metadataVersion = getMetadataVersion(namespace) + if (metadataVersion.isLessThan(MetadataVersion.IBP_3_0_IV0)) { +throw new TerseFailure(s"Cannot specify a metadata version less than ${MetadataVersion.IBP_3_0_IV0.featureLevel()}.") Review Comment: nit: Create a `MetadataVersion.isKRaftSupported()` / `MetadataVersion.firstKRaftMetadataVersion()` method? You could use it in https://github.com/apache/kafka/pull/12050/files#diff-2ee48010b1035f21f5ebdd47a78fc632ed239179aee77e66d75a6bdb77ee8fd7R147 and https://github.com/apache/kafka/pull/12050/files#diff-602affccc2af320d6cdfda31afc197bf1a798497009c404706a600ab57c1bea5R78 to name a few places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request, #12146: MINOR: Remove kraft authorizer from list of missing features
ijuma opened a new pull request, #12146: URL: https://github.com/apache/kafka/pull/12146 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ahuang98 commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
ahuang98 commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r869544110 ## clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java: ## @@ -91,6 +97,23 @@ public static NodeApiVersions create(short apiKey, short minVersion, short maxVe .setMaxVersion(maxVersion))); } +public NodeApiVersions(ApiVersionCollection nodeApiVersions, SupportedFeatureKeyCollection nodeSupportedFeatures) { +for (ApiVersion nodeApiVersion : nodeApiVersions) { Review Comment: Can we call `NodeApiVersions(ApiVersionCollection nodeApiVersions)` here to reduce some redundant code? ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -273,6 +301,40 @@ public static MetadataVersion latest() { return VALUES[VALUES.length - 1]; } +public Optional previous() { +int idx = this.ordinal(); +if (idx > 2) { +return Optional.of(MetadataVersion.values()[idx - 1]); +} else { +return Optional.empty(); +} +} + +public static boolean checkIfMetadataChanged(MetadataVersion sourceVersion, MetadataVersion targetVersion) { Review Comment: Just clarifying, this is to check if the metadata has changed in an incompatible way between the sourceVersion and targetVersion? ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -273,6 +301,40 @@ public static MetadataVersion latest() { return VALUES[VALUES.length - 1]; } +public Optional previous() { +int idx = this.ordinal(); +if (idx > 2) { +return Optional.of(MetadataVersion.values()[idx - 1]); Review Comment: ```suggestion return Optional.of(VALUES[idx]); ``` The indexing is a bit unintuitive, I wonder if you might just want to store all of `values()` (including `UNINITIALIZED`) in `VALUES` just to make this more clear? ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -273,6 +301,40 @@ public static MetadataVersion latest() { return VALUES[VALUES.length - 1]; } +public Optional previous() { +int idx = this.ordinal(); +if (idx > 2) { +return Optional.of(MetadataVersion.values()[idx - 1]); +} else { +return Optional.empty(); +} +} + +public static boolean checkIfMetadataChanged(MetadataVersion sourceVersion, MetadataVersion targetVersion) { Review Comment: Let's add a test for this ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -273,6 +301,40 @@ public static MetadataVersion latest() { return VALUES[VALUES.length - 1]; } +public Optional previous() { +int idx = this.ordinal(); Review Comment: The indexing for `ordinal()` starts at `0` right? With `MetadataVersion.UNINITIALIZED` at index 0 (which I'm assuming we don't want to count as a valid previous version), should the conditional be `if (idx > 1)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] andymg3 commented on a diff in pull request #12145: KAFKA-13892: Dedupe RemoveAccessControlEntryRecord in deleteAcls response of AclControlManager
andymg3 commented on code in PR #12145: URL: https://github.com/apache/kafka/pull/12145#discussion_r869680507 ## metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java: ## @@ -152,7 +154,11 @@ ControllerResult> deleteAcls(List filter results.add(new AclDeleteResult(ApiError.fromThrowable(e).exception())); } } -return ControllerResult.atomicOf(records, results); +return ControllerResult.atomicOf(dedupeApiMessageAndVersion(records), results); +} + +private List dedupeApiMessageAndVersion(List messages) { +return new HashSet<>(messages).stream().collect(Collectors.toList()); Review Comment: Good point. I think we could use a Set the whole way through except at the very end when we have to convert it to a List. Originally I was thinking about preserving the order of the messages as I did the de-dupe. I had the following at first: ``` return new LinkedHashSet<>(messages).stream().collect(Collectors.toList()); ``` but then I realized the ordering shouldnt matter. So I think I can make the suggested change to just use a Set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13621) Resign leader on network partition
[ https://issues.apache.org/jira/browse/KAFKA-13621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534578#comment-17534578 ] Jose Armando Garcia Sancio commented on KAFKA-13621: [~hachikuji] mentioned that after we approve and implement KIP-835 we are guaranteed for the leader to have record appended in every `metadata.monitor.write.interval.ms` (or `controller.monitor.write.interval.ms`). We can use this feature to resign the leader if written records don't commit after a fetch timeout. I think this solution seems reasonable. I have the following questions: # If we should do this in the Controller/metadata module or in the raft module? # How do we handle quorums that are fetching but are slow to catch-up to the high-watermark because they have a small log? > Resign leader on network partition > -- > > Key: KAFKA-13621 > URL: https://issues.apache.org/jira/browse/KAFKA-13621 > Project: Kafka > Issue Type: Sub-task >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > > h1. Motivation > If the current leader A at epoch X gets partition from the rest of the > quorum, quorum voter A will stay leader at epoch X. This happens because > voter A will never receive an request from the rest of the voters increasing > the epoch. These requests that typically increase the epoch of past leaders > are BeginQuorumEpoch and Vote. > In addition if voter A (leader at epoch X) doesn't get partition from the > rest of the brokers (observer in the KRaft protocol) the brokers will never > learn about the new quorum leader. This happens because 1. observers learn > about the leader from the Fetch response and 2. observer send a Fetch request > to a random leader if the Fetch request times out. > Neither of these two scenarios will cause the broker to send a request to a > different voter because the leader at epoch X will never send a different > leader in the response and the broker will never send a Fetch request to a > different voter because the Fetch request will never timeout. > h1. Proposed Changes > In this scenario the A, the leader at epoch X, will stop receiving Fetch > request from the majority of the voters. Voter A should resign as leader if > the Fetch request from the majority of the voters is old enough. A reasonable > value for "old enough" is the Fetch timeout value. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading
hachikuji commented on code in PR #11969: URL: https://github.com/apache/kafka/pull/11969#discussion_r868498298 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -725,7 +727,11 @@ object KafkaConfig { /* Authorizer Configuration ***/ val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements ${classOf[Authorizer].getName}" + - " interface, which is used by the broker for authorization." +" interface, which is used by the broker for authorization." + val EarlyStartListenersDoc = "A comma-separated list of listener names which should be started before any non-early start listeners. " + Review Comment: nit: the first sentence seems circular. I think it would be helpful to mention the authorizer use case. Also, can we say something about the default? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12062: KAFKA-13833: Remove the min_version_level from the finalized version written to ZooKeeper
mumrah commented on code in PR #12062: URL: https://github.com/apache/kafka/pull/12062#discussion_r869578975 ## core/src/main/scala/kafka/server/ApiVersionManager.scala: ## @@ -86,15 +86,15 @@ class DefaultApiVersionManager( throttleTimeMs, interBrokerProtocolVersion.highestSupportedRecordVersion, supportedFeatures, -finalizedFeatures.features, +finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava, Review Comment: Is this just needed to convert the Scala short to Java short in the map? ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -148,7 +148,10 @@ public enum MetadataVersion { IBP_3_1_IV0(3, "3.1", "IV0"), // Support for leader recovery for unclean leader election (KIP-704) -IBP_3_2_IV0(4, "3.2", "IV0"); +IBP_3_2_IV0(4, "3.2", "IV0"), + +// Removes min_version_level from the finalized version range that is written to ZooKeeper +IBP_3_2_IV1(5, "3.2", "IV1"); Review Comment: Should this be 3.3-IV0 since we're on the 3.3 release now? (i.e., current build version is 3.3-SNAPSHOT) ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -148,7 +148,10 @@ public enum MetadataVersion { IBP_3_1_IV0(3, "3.1", "IV0"), // Support for leader recovery for unclean leader election (KIP-704) -IBP_3_2_IV0(4, "3.2", "IV0"); +IBP_3_2_IV0(4, "3.2", "IV0"), + +// Removes min_version_level from the finalized version range that is written to ZooKeeper Review Comment: Can you add a mention of the KIP here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #12082: MINOR: Create case class to encapsulate fetch parameters and simplify handling
hachikuji merged PR #12082: URL: https://github.com/apache/kafka/pull/12082 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
cmccabe commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r869639379 ## clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java: ## @@ -233,4 +256,8 @@ public ApiVersion apiVersion(ApiKeys apiKey) { public Map allSupportedApiVersions() { return supportedVersions; } + +public Map supportedFeatures() { +return Collections.unmodifiableMap(supportedFeatures); Review Comment: It seems like `supportedFeatures` should be immutable and final in any case. There is no use-case for mutating this after the object is constructed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12145: KAFKA-13892: Dedupe RemoveAccessControlEntryRecord in deleteAcls response of AclControlManager
hachikuji commented on code in PR #12145: URL: https://github.com/apache/kafka/pull/12145#discussion_r869633002 ## metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java: ## @@ -152,7 +154,11 @@ ControllerResult> deleteAcls(List filter results.add(new AclDeleteResult(ApiError.fromThrowable(e).exception())); } } -return ControllerResult.atomicOf(records, results); +return ControllerResult.atomicOf(dedupeApiMessageAndVersion(records), results); +} + +private List dedupeApiMessageAndVersion(List messages) { +return new HashSet<>(messages).stream().collect(Collectors.toList()); Review Comment: nit: doesn't make a big difference, but any reason not to begin with a Set instead of converting from a List? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
mumrah commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r869631802 ## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ## @@ -55,25 +60,37 @@ public class FeatureControlManager { */ private final TimelineHashMap finalizedVersions; +/** + * The current metadata version + */ +private final TimelineObject metadataVersion; + +/** + * Collection of listeners for when features change + */ +private final Map listeners; FeatureControlManager(LogContext logContext, QuorumFeatures quorumFeatures, SnapshotRegistry snapshotRegistry) { this.log = logContext.logger(FeatureControlManager.class); this.quorumFeatures = quorumFeatures; this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0); +this.metadataVersion = new TimelineObject<>(snapshotRegistry, MetadataVersion.UNINITIALIZED); Review Comment: UNINITIALIZED was added since we need a non-null object to initialize the TimelineObject -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #12145: KAFKA-13892: Dedupe RemoveAccessControlEntryRecord in deleteAcls response of AclControlManager
cmccabe commented on PR #12145: URL: https://github.com/apache/kafka/pull/12145#issuecomment-1122797307 > I don't think we need to dedupe List. It contains a list of results, where each result contains the ACLs that matched the filter. It should be OK for the same ACL to be in multiple AclDeleteResult results because it really could match multiple filters. Agreed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
mumrah commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r869621801 ## core/src/main/scala/kafka/server/ControllerServer.scala: ## @@ -63,6 +64,8 @@ class ControllerServer( val threadNamePrefix: Option[String], val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]], val configSchema: KafkaConfigSchema, + val raftApiVersions: ApiVersions, + val bootstrapMetadata: Option[BootstrapMetadata] = None Review Comment: I still need to remove this Option and make it a regular argument. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #12142: MINOR: install Exit.exit handler in BrokerMetadataPublisherTest
cmccabe merged PR #12142: URL: https://github.com/apache/kafka/pull/12142 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13862) Add And Subtract multiple config values is not supported in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-13862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13862. - Fix Version/s: 3.3.0 Resolution: Fixed > Add And Subtract multiple config values is not supported in KRaft mode > -- > > Key: KAFKA-13862 > URL: https://issues.apache.org/jira/browse/KAFKA-13862 > Project: Kafka > Issue Type: Bug >Reporter: dengziming >Assignee: dengziming >Priority: Major > Fix For: 3.3.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji merged pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode
hachikuji merged PR #12108: URL: https://github.com/apache/kafka/pull/12108 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] andymg3 opened a new pull request, #12145: KAFKA-13892: Dedupe RemoveAccessControlEntryRecord in deleteAcls response of AclControlManager
andymg3 opened a new pull request, #12145: URL: https://github.com/apache/kafka/pull/12145 ## JIRA https://issues.apache.org/jira/browse/KAFKA-13892 ### Details In https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143 we loop through the ACL filters and and add `RemoveAccessControlEntryRecord` records to the response list for each matching ACL. There's a bug where if two (or more) filters match the same ACL, we create two (or more) `RemoveAccessControlEntryRecord` records for the same ACL. This is an issue because upon replay we throw an exception (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195) if the ACL is not in the in-memory data structures which will happen to the second `RemoveAccessControlEntryRecord`. I don't think we need to dedupe `List`. It contains a list of results, where each result contains the ACLs that matched the filter. It should be OK for the same ACL to be in multiple AclDeleteResult results because it really could match multiple filters. ### Testing - Added a unit test that tests the new behavior ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13892) Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager
[ https://issues.apache.org/jira/browse/KAFKA-13892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534543#comment-17534543 ] Andrew Grant commented on KAFKA-13892: -- I dont think we need to dedupe List. It contains a list of results, where each result contains the ACLs that matched the filter. It should be OK for the same ACL to be in multiple AclDeleteResult results because it really could match multiple filters. > Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager > > > Key: KAFKA-13892 > URL: https://issues.apache.org/jira/browse/KAFKA-13892 > Project: Kafka > Issue Type: Bug >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > > In > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143] > we loop through the ACL filters and and add RemoveAccessControlEntryRecord > records to the response list for each matching ACL. I think there's a bug > where if two filters match the same ACL, we create two > RemoveAccessControlEntryRecord records for that same ACL. This is an issue > because upon replay we throw an exception > (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195) > if the ACL is not in the in-memory data structures which will happen to the > second RemoveAccessControlEntryRecord. > Maybe we can just de-dupe both List and > List? I think something like (just showing code for > ApiMessageAndVersion): > {code:java} > private List > deDupeApiMessageAndVersion(List messages) { > return new HashSet<>(messages).stream().collect(Collectors.toList()); > }{code} > should suffice as I don't think the ordering matters within the list of > response objects. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13892) Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager
[ https://issues.apache.org/jira/browse/KAFKA-13892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-13892: - Summary: Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager (was: Dedupe response objects in deleteAcls of AclControlManager ) > Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager > > > Key: KAFKA-13892 > URL: https://issues.apache.org/jira/browse/KAFKA-13892 > Project: Kafka > Issue Type: Bug >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > > In > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143] > we loop through the ACL filters and and add RemoveAccessControlEntryRecord > records to the response list for each matching ACL. I think there's a bug > where if two filters match the same ACL, we create two > RemoveAccessControlEntryRecord records for that same ACL. This is an issue > because upon replay we throw an exception > (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195) > if the ACL is not in the in-memory data structures which will happen to the > second RemoveAccessControlEntryRecord. > Maybe we can just de-dupe both List and > List? I think something like (just showing code for > ApiMessageAndVersion): > {code:java} > private List > deDupeApiMessageAndVersion(List messages) { > return new HashSet<>(messages).stream().collect(Collectors.toList()); > }{code} > should suffice as I don't think the ordering matters within the list of > response objects. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (KAFKA-13892) Dedupe response objects in deleteAcls of AclControlManager
[ https://issues.apache.org/jira/browse/KAFKA-13892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant reassigned KAFKA-13892: Assignee: Andrew Grant > Dedupe response objects in deleteAcls of AclControlManager > --- > > Key: KAFKA-13892 > URL: https://issues.apache.org/jira/browse/KAFKA-13892 > Project: Kafka > Issue Type: Bug >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > > In > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143] > we loop through the ACL filters and and add RemoveAccessControlEntryRecord > records to the response list for each matching ACL. I think there's a bug > where if two filters match the same ACL, we create two > RemoveAccessControlEntryRecord records for that same ACL. This is an issue > because upon replay we throw an exception > (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195) > if the ACL is not in the in-memory data structures which will happen to the > second RemoveAccessControlEntryRecord. > Maybe we can just de-dupe both List and > List? I think something like (just showing code for > ApiMessageAndVersion): > {code:java} > private List > deDupeApiMessageAndVersion(List messages) { > return new HashSet<>(messages).stream().collect(Collectors.toList()); > }{code} > should suffice as I don't think the ordering matters within the list of > response objects. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] ijuma opened a new pull request, #12144: MINOR: reload4j dependency fixes
ijuma opened a new pull request, #12144: URL: https://github.com/apache/kafka/pull/12144 * Replace `log4j` with `reload4j` in `copyDependantLibs` * Exclude `log4j` and `slf4j-log4j12` transitive dependencies for 'streams:upgrade-system-tests`. Versions 0100 and 0101 had a transitive dependency to `log4j` and `slf4j-log4j12` via `zkclient` and `zookeeper`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13892) Dedupe response objects in deleteAcls of AclControlManager
Andrew Grant created KAFKA-13892: Summary: Dedupe response objects in deleteAcls of AclControlManager Key: KAFKA-13892 URL: https://issues.apache.org/jira/browse/KAFKA-13892 Project: Kafka Issue Type: Bug Reporter: Andrew Grant In [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143] we loop through the ACL filters and and add RemoveAccessControlEntryRecord records to the response list for each matching ACL. I think there's a bug where if two filters match the same ACL, we create two RemoveAccessControlEntryRecord records for that same ACL. This is an issue because upon replay we throw an exception (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195) if the ACL is not in the in-memory data structures which will happen to the second RemoveAccessControlEntryRecord. Maybe we can just de-dupe both List and List? I think something like (just showing code for ApiMessageAndVersion): {code:java} private List deDupeApiMessageAndVersion(List messages) { return new HashSet<>(messages).stream().collect(Collectors.toList()); }{code} should suffice as I don't think the ordering matters within the list of response objects. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] mumrah commented on pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
mumrah commented on PR #12050: URL: https://github.com/apache/kafka/pull/12050#issuecomment-1122497005 @dengziming addressing your old comment: > Thank you for this PR @mumrah , I have a question: should we bump IBP from 3.2-0 to 3.2-1, then in the future when migrating ZooKeeper clusters to KRaft, we can set metadataVersion=1 if the ibp is behind 3.2-0, and set metadataVersion=2 if ibp equals 3.2-1. With #12072, we have associated a `metadata.version` with each IBP starting with 3.0-IV0. In this PR, I've added 3.3-IV0 which will be metadata version 5. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
mumrah commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r869326805 ## core/src/main/scala/kafka/server/KafkaRaftServer.scala: ## @@ -102,6 +106,8 @@ class KafkaRaftServer( threadNamePrefix, controllerQuorumVotersFuture, KafkaRaftServer.configSchema, + raftApiVersions, + Some(bootstrapMetadata) Review Comment: That's an interesting point. For the broker, on first start, it will start up without a `metadata.version`. It won't be until the bootstrapped version is written by the controller and replicated to the broker that BrokerMetadataPublisher will update the FinalizedFeatureCache. This is probably okay though, since brokers start up in a fenced state in KRaft anyways. Since the `metadata.version` bootstrapping is done in the leader election callback on the controller, we should have the FeatureLevelRecord written out before any broker registrations occur. I think this should avoid any issues with a broker becoming unfenced before knowing what the `metadata.version` is. Does that make sense? Or did I misunderstand your question :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Gerrrr commented on pull request #12122: WIP: Upgrade tests for KAFKA-13769
Ge commented on PR #12122: URL: https://github.com/apache/kafka/pull/12122#issuecomment-1122378479 @mjsax Can you please review? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12140: KAFKA-13891: reset generation when syncgroup failed with REBALANCE_IN_PROGRESS
showuon commented on code in PR #12140: URL: https://github.com/apache/kafka/pull/12140#discussion_r869204407 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -807,6 +807,7 @@ public void handle(SyncGroupResponse syncResponse, } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " + "Sent generation was {}", sentGeneration); +resetStateOnResponseError(ApiKeys.SYNC_GROUP, error, false); Review Comment: We might need to add a comment here to explain why we need to reset generation ID here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #12143: MINOR: Update release versions for upgrade tests with ongoing release
cadonna merged PR #12143: URL: https://github.com/apache/kafka/pull/12143 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aiquestion commented on pull request #12140: KAFKA-13891: reset generation when syncgroup failed with REBALANCE_IN_PROGRESS
aiquestion commented on PR #12140: URL: https://github.com/apache/kafka/pull/12140#issuecomment-1122343498 @[showuon](https://github.com/showuon) can you please help to review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi closed pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap
viktorsomogyi closed pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap URL: https://github.com/apache/kafka/pull/9519 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap
viktorsomogyi commented on PR #9519: URL: https://github.com/apache/kafka/pull/9519#issuecomment-1122331820 Closing this due to inactivity from reviewers' part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aiquestion commented on pull request #12140: KAFKA-13891: reset generation when syncgroup failed with REBALANCE_IN_PROGRESS
aiquestion commented on PR #12140: URL: https://github.com/apache/kafka/pull/12140#issuecomment-1122315793 created a jira for it: https://issues.apache.org/jira/browse/KAFKA-13891 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive
Shawn Wang created KAFKA-13891: -- Summary: sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive Key: KAFKA-13891 URL: https://issues.apache.org/jira/browse/KAFKA-13891 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.0.0 Reporter: Shawn Wang This issue was first found in [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419] But the previous PR forgot to reset generation when sync group failed with rebalanceInProgress error. So the previous bug still exists and it may cause consumer to rebalance many rounds before final stable. Here's the example ({*}bold is added{*}): # consumer A joined and synced group successfully with generation 1 *( with ownedPartition P1/P2 )* # New rebalance started with generation 2, consumer A joined successfully, but somehow, consumer A doesn't send out sync group immediately # other consumer completed sync group successfully in generation 2, except consumer A. # After consumer A send out sync group, the new rebalance start, with generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group response # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with generation 3, with the assignment (ownedPartition) in generation 1. # So, now, we have out-of-date ownedPartition sent, with unexpected results happened # *After the generation-3 rebalance, consumer A got P3/P4 partition. the ownedPartition is ignored because of old generation.* # *consumer A revoke P1/P2 and re-join to start a new round of rebalance* # *if some other consumer C failed to syncGroup before consumer A's joinGroup. the same issue will happens again and result in many rounds of rebalance before stable* -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] rajinisivaram merged pull request #12131: KAFKA-13879: Reconnect exponential backoff is ineffective in some cases
rajinisivaram merged PR #12131: URL: https://github.com/apache/kafka/pull/12131 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #12131: KAFKA-13879: Reconnect exponential backoff is ineffective in some cases
rajinisivaram commented on PR #12131: URL: https://github.com/apache/kafka/pull/12131#issuecomment-1122212951 @chernyih Thanks for the PR, LGTM. Seems like a reasonable change since we are just ensuring that backoff is applied unless ApiVersions response is received. Since we were applying exponential backoff for authentication failures, delaying to ApiVersions response seems ok. Merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request, #12143: MINOR: Update release versions for upgrade tests with ongoing release
cadonna opened a new pull request, #12143: URL: https://github.com/apache/kafka/pull/12143 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sayantanu-dey commented on pull request #12035: KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an overload that does so
sayantanu-dey commented on PR #12035: URL: https://github.com/apache/kafka/pull/12035#issuecomment-1122126444 @guozhangwang @dajac sorry for being late, I went through the whole conversation and I have pushed the changes suggested. 1. Now I am returning the `close` status after resolving the future returned by `adminClient#removeMembersFromConsumerGroup` 2. Now I have also added `membersToRemove` in the constructor `RemoveMembersFromConsumerGroupOptions`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12113: MINOR: Small cleanups in connect/mirror
showuon commented on PR #12113: URL: https://github.com/apache/kafka/pull/12113#issuecomment-1121995342 Failed tests are unrelated: ``` Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopology Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment() Build / JDK 11 and Scala 2.13 / kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #12113: MINOR: Small cleanups in connect/mirror
showuon merged PR #12113: URL: https://github.com/apache/kafka/pull/12113 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org