[GitHub] [kafka] chia7712 commented on a change in pull request #10162: DOCS: Update protocol doc for missing data type
chia7712 commented on a change in pull request #10162: URL: https://github.com/apache/kafka/pull/10162#discussion_r579615450 ## File path: clients/src/main/resources/common/message/README.md ## @@ -85,6 +87,8 @@ There are several primitive field types available. * "bytes": binary data. +* "records": record set used in fetch api and fetch snapshot api Review comment: `records` is used by produce data also. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10124: MINOR: apply Utils.isBlank to code base
chia7712 merged pull request #10124: URL: https://github.com/apache/kafka/pull/10124 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10124: MINOR: apply Utils.isBlank to code base
chia7712 commented on pull request #10124: URL: https://github.com/apache/kafka/pull/10124#issuecomment-782552393 `kafka.admin.ListConsumerGroupTest.testListConsumerGroupsWithStates()` unrelated error This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10149: AbstractCoordinator should log with its subclass
chia7712 merged pull request #10149: URL: https://github.com/apache/kafka/pull/10149 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10148: Update KafkaConsumerMetrics.java
chia7712 merged pull request #10148: URL: https://github.com/apache/kafka/pull/10148 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12335) Upgrade junit from 5.7.0 to 5.7.1
[ https://issues.apache.org/jira/browse/KAFKA-12335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12335. Fix Version/s: 3.0.0 Resolution: Fixed > Upgrade junit from 5.7.0 to 5.7.1 > - > > Key: KAFKA-12335 > URL: https://issues.apache.org/jira/browse/KAFKA-12335 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 3.0.0 > > > junit 5.7.1 release notes: > [https://junit.org/junit5/docs/5.7.1/release-notes/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 merged pull request #10145: KAFKA-12335 Upgrade junit from 5.7.0 to 5.7.1
chia7712 merged pull request #10145: URL: https://github.com/apache/kafka/pull/10145 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10145: KAFKA-12335 Upgrade junit from 5.7.0 to 5.7.1
chia7712 commented on pull request #10145: URL: https://github.com/apache/kafka/pull/10145#issuecomment-782549839 `Build / JDK 15 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector` #10077 `org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithDefaultSettings` #10152 #10158 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12339) Add retry to admin client's listOffsets
[ https://issues.apache.org/jira/browse/KAFKA-12339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-12339: --- Summary: Add retry to admin client's listOffsets (was: Starting new connector cluster with new internal topics encounters UnknownTopicOrPartitionException) > Add retry to admin client's listOffsets > --- > > Key: KAFKA-12339 > URL: https://issues.apache.org/jira/browse/KAFKA-12339 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.2, 2.8.0, 2.7.1, 2.6.2 >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Blocker > > After upgrading our connector env to 2.9.0-SNAPSHOT, sometimes the connect > cluster encounters following error. > {quote}Uncaught exception in herder work thread, exiting: > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:324) > org.apache.kafka.connect.errors.ConnectException: Error while getting end > offsets for topic 'connect-storage-topic-connect-cluster-1' > at org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:689) > at > org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:338) > at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:195) > at > org.apache.kafka.connect.storage.KafkaStatusBackingStore.start(KafkaStatusBackingStore.java:216) > at > org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:129) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:310) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:668) > ... 10 more > {quote} > [https://github.com/apache/kafka/pull/9780] added shared admin to get end > offsets. KafkaAdmin#listOffsets does not handle topic-level error, hence the > UnknownTopicOrPartitionException on topic-level can obstruct worker from > running when the new internal topic is NOT synced to all brokers. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…
chia7712 commented on pull request #10152: URL: https://github.com/apache/kafka/pull/10152#issuecomment-782527168 > Also might be good to note in the description that the error could be retriable in a same way as the one on partition metadata and it's the fact that it is not retried (here or in the connect worker) that creates the issue. Should we cherry-pick this patch to other active branches? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…
chia7712 commented on pull request #10152: URL: https://github.com/apache/kafka/pull/10152#issuecomment-782526238 @rhauch @kkonstantine Thanks for your reviews! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe merged pull request #10070: URL: https://github.com/apache/kafka/pull/10070 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on pull request #10070: URL: https://github.com/apache/kafka/pull/10070#issuecomment-782521288 Thanks, @junrao ! :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10817) Add clusterId validation to Fetch handling
[ https://issues.apache.org/jira/browse/KAFKA-10817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287447#comment-17287447 ] dengziming commented on KAFKA-10817: ClusterId validation is only applied to Fetch requests? should we also apply it to BeginQuorum EndQuorum FetchSnapshot? > Add clusterId validation to Fetch handling > -- > > Key: KAFKA-10817 > URL: https://issues.apache.org/jira/browse/KAFKA-10817 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: David Jacot >Priority: Major > > Initially we were unsure how the clusterId would be generated by the cluster > after Zookeeper removal, so we did not implement it. It is looking now like > we will probably require users to generate it manually prior to starting the > cluster. See here for details: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-631%3A+The+Quorum-based+Kafka+Controller#KIP631:TheQuorumbasedKafkaController-Fencing. > In this case, we can assume that the clusterId will be provided when > instantiating the raft client, so we can add the logic to the request handler > to validate it in inbound Fetch requests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579565540 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,900 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private final int partitionEpoch; + +
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579564654 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -0,0 +1,920 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.IsrChangeRecord; +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.QuotaRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistrationReply; +import org.apache.kafka.metadata.FeatureManager; +import org.apache.kafka.metadata.VersionRange; +import org.apache.kafka.metalog.MetaLogLeader; +import org.apache.kafka.metalog.MetaLogListener; +import org.apache.kafka.metalog.MetaLogManager; +import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + + +public final class QuorumController implements Controller { +/** + * A builder class which creates the QuorumController. + */ +static public class Builder { +private final int nodeId; +private Time time = Time.SYSTEM; +private String threadNamePrefix = null; +private LogContext logContext = null; +private Map configDefs = Collections.emptyMap(); +private MetaLogManager logManager = null; +private Map supportedFeatures = Collections.emptyMap(); +private short defaultReplicationFactor = 3; +private int defaultNumPartitions = 1; +private ReplicaPlacementPolicy replicaPlacementPolicy = +new SimpleReplicaPlacementPolicy(new Random()); +private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, TimeUnit.SECONDS); + +public Builder(int nodeId) { +this.nodeId = nodeId; +} + +public B
[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
junrao commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r578003900 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -0,0 +1,920 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.IsrChangeRecord; +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.QuotaRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistrationReply; +import org.apache.kafka.metadata.FeatureManager; +import org.apache.kafka.metadata.VersionRange; +import org.apache.kafka.metalog.MetaLogLeader; +import org.apache.kafka.metalog.MetaLogListener; +import org.apache.kafka.metalog.MetaLogManager; +import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + + +public final class QuorumController implements Controller { +/** + * A builder class which creates the QuorumController. + */ +static public class Builder { +private final int nodeId; +private Time time = Time.SYSTEM; +private String threadNamePrefix = null; +private LogContext logContext = null; +private Map configDefs = Collections.emptyMap(); +private MetaLogManager logManager = null; +private Map supportedFeatures = Collections.emptyMap(); +private short defaultReplicationFactor = 3; +private int defaultNumPartitions = 1; +private ReplicaPlacementPolicy replicaPlacementPolicy = +new SimpleReplicaPlacementPolicy(new Random()); +private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, TimeUnit.SECONDS); + +public Builder(int nodeId) { +this.nodeId = nodeId; +} + +public Bu
[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
junrao commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r578065045 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -0,0 +1,920 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.IsrChangeRecord; +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.QuotaRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistrationReply; +import org.apache.kafka.metadata.FeatureManager; +import org.apache.kafka.metadata.VersionRange; +import org.apache.kafka.metalog.MetaLogLeader; +import org.apache.kafka.metalog.MetaLogListener; +import org.apache.kafka.metalog.MetaLogManager; +import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + + +public final class QuorumController implements Controller { +/** + * A builder class which creates the QuorumController. + */ +static public class Builder { +private final int nodeId; +private Time time = Time.SYSTEM; +private String threadNamePrefix = null; +private LogContext logContext = null; +private Map configDefs = Collections.emptyMap(); +private MetaLogManager logManager = null; +private Map supportedFeatures = Collections.emptyMap(); +private short defaultReplicationFactor = 3; +private int defaultNumPartitions = 1; +private ReplicaPlacementPolicy replicaPlacementPolicy = +new SimpleReplicaPlacementPolicy(new Random()); +private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, TimeUnit.SECONDS); + +public Builder(int nodeId) { +this.nodeId = nodeId; +} + +public Bu
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579564579 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -0,0 +1,920 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.IsrChangeRecord; +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.QuotaRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistrationReply; +import org.apache.kafka.metadata.FeatureManager; +import org.apache.kafka.metadata.VersionRange; +import org.apache.kafka.metalog.MetaLogLeader; +import org.apache.kafka.metalog.MetaLogListener; +import org.apache.kafka.metalog.MetaLogManager; +import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + + +public final class QuorumController implements Controller { +/** + * A builder class which creates the QuorumController. + */ +static public class Builder { +private final int nodeId; +private Time time = Time.SYSTEM; +private String threadNamePrefix = null; +private LogContext logContext = null; +private Map configDefs = Collections.emptyMap(); +private MetaLogManager logManager = null; +private Map supportedFeatures = Collections.emptyMap(); +private short defaultReplicationFactor = 3; +private int defaultNumPartitions = 1; +private ReplicaPlacementPolicy replicaPlacementPolicy = +new SimpleReplicaPlacementPolicy(new Random()); +private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, TimeUnit.SECONDS); + +public Builder(int nodeId) { +this.nodeId = nodeId; +} + +public B
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579564268 ## File path: metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.FeatureMap; +import org.apache.kafka.metadata.FeatureMapAndEpoch; +import org.apache.kafka.metadata.VersionRange; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineHashSet; + + +public class FeatureControlManager { +/** + * The features supported by this controller's software. + */ +private final Map supportedFeatures; + +/** + * Maps feature names to finalized version ranges. + */ +private final TimelineHashMap finalizedVersions; + +/** + * The latest feature epoch. + */ +private final TimelineHashSet epoch; + +FeatureControlManager(Map supportedFeatures, + SnapshotRegistry snapshotRegistry) { +this.supportedFeatures = supportedFeatures; +this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0); +this.epoch = new TimelineHashSet<>(snapshotRegistry, 0); +} + +ControllerResult> updateFeatures( +Map updates, Set downgradeables, +Map> brokerFeatures) { +TreeMap results = new TreeMap<>(); +List records = new ArrayList<>(); +for (Entry entry : updates.entrySet()) { +results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(), +downgradeables.contains(entry.getKey()), brokerFeatures, records)); +} +return new ControllerResult<>(records, results); +} + +private ApiError updateFeature(String featureName, + VersionRange newRange, + boolean downgradeable, + Map> brokerFeatures, + List records) { +if (newRange.min() <= 0) { +return new ApiError(Errors.INVALID_UPDATE_VERSION, +"The lower value for the new range cannot be less than 1."); +} +if (newRange.max() <= 0) { +return new ApiError(Errors.INVALID_UPDATE_VERSION, +"The upper value for the new range cannot be less than 1."); +} +VersionRange localRange = supportedFeatures.get(featureName); Review comment: let's revisit this after 2.8 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9672) Dead brokers in ISR cause isr-expiration to fail with exception
[ https://issues.apache.org/jira/browse/KAFKA-9672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-9672. Fix Version/s: 3.0.0 Resolution: Fixed merged the PR to trunk > Dead brokers in ISR cause isr-expiration to fail with exception > --- > > Key: KAFKA-9672 > URL: https://issues.apache.org/jira/browse/KAFKA-9672 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.0, 2.4.1 >Reporter: Ivan Yurchenko >Assignee: Jose Armando Garcia Sancio >Priority: Major > Fix For: 3.0.0 > > > We're running Kafka 2.4 and facing a pretty strange situation. > Let's say there were three brokers in the cluster 0, 1, and 2. Then: > 1. Broker 3 was added. > 2. Partitions were reassigned from broker 0 to broker 3. > 3. Broker 0 was shut down (not gracefully) and removed from the cluster. > 4. We see the following state in ZooKeeper: > {code:java} > ls /brokers/ids > [1, 2, 3] > get /brokers/topics/foo > {"version":2,"partitions":{"0":[2,1,3]},"adding_replicas":{},"removing_replicas":{}} > get /brokers/topics/foo/partitions/0/state > {"controller_epoch":123,"leader":1,"version":1,"leader_epoch":42,"isr":[0,2,3,1]} > {code} > It means, the dead broker 0 remains in the partitions's ISR. A big share of > the partitions in the cluster have this issue. > This is actually causing an errors: > {code:java} > Uncaught exception in scheduled task 'isr-expiration' > (kafka.utils.KafkaScheduler) > org.apache.kafka.common.errors.ReplicaNotAvailableException: Replica with id > 12 is not available on broker 17 > {code} > It means that effectively {{isr-expiration}} task is not working any more. > I have a suspicion that this was introduced by [this commit (line > selected)|https://github.com/apache/kafka/commit/57baa4079d9fc14103411f790b9a025c9f2146a4#diff-5450baca03f57b9f2030f93a480e6969R856] > Unfortunately, I haven't been able to reproduce this in isolation. > Any hints about how to reproduce (so I can write a patch) or mitigate the > issue on a running cluster are welcome. > Generally, I assume that not throwing {{ReplicaNotAvailableException}} on a > dead (i.e. non-existent) broker, considering them out-of-sync and removing > from the ISR should fix the problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao merged pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas
junrao merged pull request #9631: URL: https://github.com/apache/kafka/pull/9631 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
junrao commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579553251 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,908 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +/** + * A special value used to represent the leader for a partition with no leader. + */ +public static final int NO_LEADER = -1; + +/** + * A special value used to represent a PartitionChangeRecord that does not change the + * partition leader. + */ +public static final int NO_LEADER_CHANGE = -2; + +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(sn
[jira] [Updated] (KAFKA-12343) Recent change to use SharedTopicAdmin in KakfkaBasedLog fails with AK 0.10.x brokers
[ https://issues.apache.org/jira/browse/KAFKA-12343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-12343: -- Fix Version/s: 2.6.2 2.7.1 2.8.0 2.5.2 > Recent change to use SharedTopicAdmin in KakfkaBasedLog fails with AK 0.10.x > brokers > > > Key: KAFKA-12343 > URL: https://issues.apache.org/jira/browse/KAFKA-12343 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.2, 2.8.0, 2.7.1, 2.6.2 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Blocker > Fix For: 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > > System test failure > ([sample|http://confluent-kafka-2-7-system-test-results.s3-us-west-2.amazonaws.com/2021-02-18--001.1613655226--confluentinc--2.7--54952635e5/report.html]): > {code:java} > Java.lang.Exception: UnsupportedVersionException: MetadataRequest versions > older than 4 don't support the allowAutoTopicCreation field > at > org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:755) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1136) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1301) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1224) > at java.lang.Thread.run(Thread.java:748) > [2021-02-16 12:05:11,735] ERROR [Worker clientId=connect-1, > groupId=connect-cluster] Uncaught exception in herder work thread, exiting: > (org.apache.kafka.connect.runtime.distributed.Di > stributedHerder) > org.apache.kafka.connect.errors.ConnectException: API to get the get the end > offsets for topic 'connect-offsets' is unsupported on brokers at worker25:9092 > at > org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:680) > at > org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:338) > at > org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:195) > at > org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:136) > at org.apache.kafka.connect.runtime.Worker.start(Worker.java:197) > at > org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:128) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:311) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest > versions older than 4 don't support the allowAutoTopicCre > ation field > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:668) > ... 11 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12343) Recent change to use SharedTopicAdmin in KakfkaBasedLog fails with AK 0.10.x brokers
[ https://issues.apache.org/jira/browse/KAFKA-12343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-12343. --- Reviewer: Konstantine Karantasis Resolution: Fixed > Recent change to use SharedTopicAdmin in KakfkaBasedLog fails with AK 0.10.x > brokers > > > Key: KAFKA-12343 > URL: https://issues.apache.org/jira/browse/KAFKA-12343 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.2, 2.8.0, 2.7.1, 2.6.2 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Blocker > Fix For: 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > > System test failure > ([sample|http://confluent-kafka-2-7-system-test-results.s3-us-west-2.amazonaws.com/2021-02-18--001.1613655226--confluentinc--2.7--54952635e5/report.html]): > {code:java} > Java.lang.Exception: UnsupportedVersionException: MetadataRequest versions > older than 4 don't support the allowAutoTopicCreation field > at > org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:755) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1136) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1301) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1224) > at java.lang.Thread.run(Thread.java:748) > [2021-02-16 12:05:11,735] ERROR [Worker clientId=connect-1, > groupId=connect-cluster] Uncaught exception in herder work thread, exiting: > (org.apache.kafka.connect.runtime.distributed.Di > stributedHerder) > org.apache.kafka.connect.errors.ConnectException: API to get the get the end > offsets for topic 'connect-offsets' is unsupported on brokers at worker25:9092 > at > org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:680) > at > org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:338) > at > org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:195) > at > org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:136) > at org.apache.kafka.connect.runtime.Worker.start(Worker.java:197) > at > org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:128) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:311) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest > versions older than 4 don't support the allowAutoTopicCre > ation field > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:668) > ... 11 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio edited a comment on pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas
jsancio edited a comment on pull request #9631: URL: https://github.com/apache/kafka/pull/9631#issuecomment-782469265 @junrao I merged the latest trunk to this PR. Jenkins ran the tests and all the failures look unrelated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas
jsancio commented on pull request #9631: URL: https://github.com/apache/kafka/pull/9631#issuecomment-782469265 @junrao I merged the latest master to this PR. Jenkins ran the tests and all the failures look unrelated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579547910 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fin
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579547825 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fin
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579547731 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fin
[jira] [Created] (KAFKA-12349) Follow up on PartitionEpoch in KIP-500
Colin McCabe created KAFKA-12349: Summary: Follow up on PartitionEpoch in KIP-500 Key: KAFKA-12349 URL: https://issues.apache.org/jira/browse/KAFKA-12349 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe Assignee: Colin McCabe * Remove the compatibility shim between raft and the kip-500 controller * standardize on the epoch data type (probably int) * review partition epoch, leader epoch -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579547404 ## File path: metadata/src/main/resources/common/metadata/PartitionRecord.json ## @@ -34,6 +34,8 @@ { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", "about": "The lead replica, or -1 if there is no leader." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", - "about": "An epoch that gets incremented each time we change the ISR." } + "about": "An epoch that gets incremented each time we change the ISR." }, +{ "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1", Review comment: filed KAFKA-12349 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579547098 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,908 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +/** + * A special value used to represent the leader for a partition with no leader. + */ +public static final int NO_LEADER = -1; + +/** + * A special value used to represent a PartitionChangeRecord that does not change the + * partition leader. + */ +public static final int NO_LEADER_CHANGE = -2; + +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(s
[jira] [Commented] (KAFKA-12169) Consumer can not know paritions change when client leader restart with static membership protocol
[ https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287431#comment-17287431 ] Guozhang Wang commented on KAFKA-12169: --- Inside the AbstractCoordinator: {code} public synchronized boolean checkAssignmentMatchedSubscription(Collection assignments) { for (TopicPartition topicPartition : assignments) { if (this.subscribedPattern != null) { if (!this.subscribedPattern.matcher(topicPartition.topic()).matches()) { log.info("Assigned partition {} for non-subscribed topic regex pattern; subscription pattern is {}", topicPartition, this.subscribedPattern); return false; } } else { if (!this.subscription.contains(topicPartition.topic())) { log.info("Assigned partition {} for non-subscribed topic; subscription is {}", topicPartition, this.subscription); return false; } } } return true; } {code} We only check that the topic names are included in the subscription, but do not check their num.partitions against the metadata. > Consumer can not know paritions change when client leader restart with static > membership protocol > - > > Key: KAFKA-12169 > URL: https://issues.apache.org/jira/browse/KAFKA-12169 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.5.1, 2.6.1 >Reporter: zou shengfu >Priority: Major > Labels: bug > > Background: > Kafka consumer services run with static membership and cooperative rebalance > protocol on kubernetes, and services often restart because of operation. When > we added partitions from 1000 to 2000 for the topic, client leader restart > with unknown member id at the same time, we found the consumers do not > tigger rebalance and still consume 1000 paritions > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter
MarcoLotz commented on a change in pull request #10042: URL: https://github.com/apache/kafka/pull/10042#discussion_r579543547 ## File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java ## @@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { assertEquals(3, records.count()); } +@Test +public void testResetToSpecificOffsetWhenPartitionIsEmpty() { +final MockConsumer emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); +emptyConsumer.assign(Collections.singletonList(topicPartition)); + +final Map endOffsets = new HashMap<>(); +endOffsets.put(topicPartition, 0L); +emptyConsumer.updateEndOffsets(endOffsets); + +final Map beginningOffsets = new HashMap<>(); +beginningOffsets.put(topicPartition, 0L); +emptyConsumer.updateBeginningOffsets(beginningOffsets); + +streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L); + +final ConsumerRecords records = emptyConsumer.poll(Duration.ofMillis(500)); +assertEquals(0, records.count()); Review comment: @mjsax I have changed the PR to make sure that the tests verify the committed offset. It seems to me, however, that the original test implementer intent was to verify the position of the consumer after the method call - not if it committed the offset. Specially because the ```position()``` and ```commitAsync()``` calls happen outside the methods under test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell
cmccabe commented on pull request #10094: URL: https://github.com/apache/kafka/pull/10094#issuecomment-782461539 ran test manually and committed from command line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell
cmccabe closed pull request #10094: URL: https://github.com/apache/kafka/pull/10094 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
junrao commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579538752 ## File path: metadata/src/main/resources/common/metadata/PartitionRecord.json ## @@ -34,6 +34,8 @@ { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", "about": "The lead replica, or -1 if there is no leader." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", - "about": "An epoch that gets incremented each time we change the ISR." } + "about": "An epoch that gets incremented each time we change the ISR." }, +{ "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1", Review comment: Could you file a separate jira to follow up on PartitionEpoch post 2.8? ## File path: metadata/src/main/resources/common/metadata/PartitionRecord.json ## @@ -34,6 +34,8 @@ { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", "about": "The lead replica, or -1 if there is no leader." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", - "about": "An epoch that gets incremented each time we change the ISR." } + "about": "An epoch that gets incremented each time we change the ISR." }, +{ "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1", Review comment: Could you file a separate jira to follow up on PartitionEpoch post 2.8? ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,908 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry
[GitHub] [kafka] tinawenqiao commented on pull request #9235: KAFKA-10449: Add some important parameter desc in connect-distributed.properties
tinawenqiao commented on pull request #9235: URL: https://github.com/apache/kafka/pull/9235#issuecomment-782437733 > Thanks for the update, it looks much better. > > I should have been clearer, I meant to update the documentation text of these settings, not just the comments in `WorkerConfigs`. > > For example in https://github.com/apache/kafka/pull/9235/files#diff-5664ddd985065623bab1870e031ce4f3fcfee0bc4951e41b6b0e1584cd32fa2dR153, we could do something like: > > ```java > private static final String REST_HOST_NAME_DOC > = "Hostname for the REST API. If this is set, it will only bind to this interface. Deprecated, only used when listeners is not set. Use listeners instead."; > ``` Doc desc has been added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch merged pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…
rhauch merged pull request #10152: URL: https://github.com/apache/kafka/pull/10152 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579522827 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fin
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579522201 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fin
[jira] [Resolved] (KAFKA-10817) Add clusterId validation to Fetch handling
[ https://issues.apache.org/jira/browse/KAFKA-10817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10817. - Resolution: Fixed > Add clusterId validation to Fetch handling > -- > > Key: KAFKA-10817 > URL: https://issues.apache.org/jira/browse/KAFKA-10817 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: David Jacot >Priority: Major > > Initially we were unsure how the clusterId would be generated by the cluster > after Zookeeper removal, so we did not implement it. It is looking now like > we will probably require users to generate it manually prior to starting the > cluster. See here for details: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-631%3A+The+Quorum-based+Kafka+Controller#KIP631:TheQuorumbasedKafkaController-Fencing. > In this case, we can assume that the clusterId will be provided when > instantiating the raft client, so we can add the logic to the request handler > to validate it in inbound Fetch requests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10129: KAFKA-10817; Add clusterId validation to Fetch handling
hachikuji merged pull request #10129: URL: https://github.com/apache/kafka/pull/10129 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10129: KAFKA-10817; Add clusterId validation to Fetch handling
hachikuji commented on pull request #10129: URL: https://github.com/apache/kafka/pull/10129#issuecomment-782423782 I verified tests locally. I am going to merge to trunk and 2.8 since we seem blocked by jenkins at the moment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter
MarcoLotz commented on a change in pull request #10042: URL: https://github.com/apache/kafka/pull/10042#discussion_r579519988 ## File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java ## @@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { assertEquals(3, records.count()); } +@Test +public void testResetToSpecificOffsetWhenPartitionIsEmpty() { +final MockConsumer emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); +emptyConsumer.assign(Collections.singletonList(topicPartition)); + +final Map endOffsets = new HashMap<>(); +endOffsets.put(topicPartition, 0L); +emptyConsumer.updateEndOffsets(endOffsets); + +final Map beginningOffsets = new HashMap<>(); +beginningOffsets.put(topicPartition, 0L); +emptyConsumer.updateBeginningOffsets(beginningOffsets); + +streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L); + +final ConsumerRecords records = emptyConsumer.poll(Duration.ofMillis(500)); +assertEquals(0, records.count()); Review comment: @mjsax I guess got your point. In the CUT the call ```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` makes the consumer client to seek that offset (if available) without a commit. The call to commit offset happens in another section of the code that is not under test there (line 407 of StreamsResetter.java). I will change the test logic so that the "given condition" uses ```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` and "when condition" is actually the call of ```client.commitSync();```. This way, the "then condition" would be able to test using the commited offsets. Same applies for the other test. ## File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java ## @@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { assertEquals(3, records.count()); } +@Test +public void testResetToSpecificOffsetWhenPartitionIsEmpty() { +final MockConsumer emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); +emptyConsumer.assign(Collections.singletonList(topicPartition)); + +final Map endOffsets = new HashMap<>(); +endOffsets.put(topicPartition, 0L); +emptyConsumer.updateEndOffsets(endOffsets); + +final Map beginningOffsets = new HashMap<>(); +beginningOffsets.put(topicPartition, 0L); +emptyConsumer.updateBeginningOffsets(beginningOffsets); + +streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L); + +final ConsumerRecords records = emptyConsumer.poll(Duration.ofMillis(500)); +assertEquals(0, records.count()); Review comment: @mjsax I guess got your point. In the CUT the call ```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` makes the consumer client to seek that offset (if available) without a commit. The call to commit offset happens in another section of the code that is not under test there (line 407 of StreamsResetter.java). I will change the test logic so that the "given condition" uses ```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` and "when condition" is actually the call of ```client.commitSync();```. This way, the "then condition" would be able to test using the committed offsets. Same applies for the other test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell
hachikuji commented on a change in pull request #10094: URL: https://github.com/apache/kafka/pull/10094#discussion_r579516434 ## File path: settings.gradle ## @@ -29,6 +29,7 @@ include 'clients', 'log4j-appender', 'metadata', 'raft', +'shell', Review comment: Ok. I was mainly objecting to the generality of the name since this it is focused only on the metadata for KIP-500. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter
MarcoLotz commented on a change in pull request #10042: URL: https://github.com/apache/kafka/pull/10042#discussion_r579504541 ## File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java ## @@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { assertEquals(3, records.count()); } +@Test +public void testResetToSpecificOffsetWhenPartitionIsEmpty() { Review comment: You are right - indeed the test is orthogonal to the fix. I have decided to add it there due to the lack of any test scenario for reseting partition offset (to a specific offset) on an empty partition. The original CUT passed this test without the bug-fix. I considered an important scenario to test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter
MarcoLotz commented on a change in pull request #10042: URL: https://github.com/apache/kafka/pull/10042#discussion_r579502090 ## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ## @@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer client, final Map topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes); for (final TopicPartition topicPartition : inputTopicPartitions) { -client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset()); +final Optional partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition)) +.map(OffsetAndTimestamp::offset) +.filter(offset -> offset != ListOffsetsResponse.UNKNOWN_OFFSET); +if (partitionOffset.isPresent()) { +client.seek(topicPartition, partitionOffset.get()); +} else { +client.seekToEnd(Collections.singletonList(topicPartition)); +System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() + +" is empty, without a committed record. Falling back to latest known offset."); Review comment: I see your point, I don't mind removing "without a committed record" part of the message. @jeqo This would have to be updated on the scala code too, since I saw that the messages are about the same. Would that be ok? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
junrao commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579497748 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fina
[jira] [Comment Edited] (KAFKA-12328) Expose TaskId partition number
[ https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287386#comment-17287386 ] fml2 edited comment on KAFKA-12328 at 2/19/21, 9:53 PM: {quote}In general, the ideas of the "supplier pattern" is _inherently_ to return a new object each time it's invoked {quote} According to my understanding, the supplier pattern gives you _the possibilty_ to return a new instance on each invocation – thus adjusting the result to the current situation if needed. But, as a pattern, it does not prohibit to also return the same instance if this flexibility is not needed. was (Author: fml2): {quote}In general, the ideas of the "supplier pattern" is _inherently_ to return a new object each time it's invoked {quote} According to my understanding, the supplier pattern gives you _the possibilty_ to return a new instance on each invoke – thus adjusting the result to the current situation if needed. But, as a pattern, it does not prohibit to also return the same instance if this flexibility is not needed. > Expose TaskId partition number > -- > > Key: KAFKA-12328 > URL: https://issues.apache.org/jira/browse/KAFKA-12328 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: fml2 >Priority: Major > Labels: needs-kip > > This question was posted [on > stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over] > and got an answer but the solution is quite complicated hence this ticket. > > In my Kafka Streams application, I have a task that sets up a scheduled (by > the wall time) punctuator. The punctuator iterates over the entries of a > store and does something with them. Like this: > {code:java} > var store = context().getStateStore("MyStore"); > var iter = store.all(); > while (iter.hasNext()) { >var entry = iter.next(); >// ... do something with the entry > } > // Print a summary (now): N entries processed > // Print a summary (wish): N entries processed in partition P > {code} > Is it possible to find out which partition the punctuator operates on? The > java docs for {{ProcessorContext.partition()}} states that this method > returns {{-1}} within punctuators. > I've read [Kafka Streams: Punctuate vs > Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process] > and the answers there. I can understand that a task is, in general, not tied > to a particular partition. But an iterator should be tied IMO. > How can I find out the partition? > Or is my assumption that a particular instance of a store iterator is tied to > a partion wrong? > What I need it for: I'd like to include the partition number in some log > messages. For now, I have several nearly identical log messages stating that > the punctuator does this and that. In order to make those messages "unique" > I'd like to include the partition number into them. > Since I'm working with a single store here (which might be partitioned), I > assume that every single execution of the punctuator is bound to a single > partition of that store. > > It would be cool if there were a method {{iterator.partition}} (or similar) > to get this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12328) Expose TaskId partition number
[ https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287386#comment-17287386 ] fml2 commented on KAFKA-12328: -- {quote}In general, the ideas of the "supplier pattern" is _inherently_ to return a new object each time it's invoked {quote} According to my understanding, the supplier pattern gives you _the possibilty_ to return a new instance on each invoke – thus adjusting the result to the current situation if needed. But, as a pattern, it does not prohibit to also return the same instance if this flexibility is not needed. > Expose TaskId partition number > -- > > Key: KAFKA-12328 > URL: https://issues.apache.org/jira/browse/KAFKA-12328 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: fml2 >Priority: Major > Labels: needs-kip > > This question was posted [on > stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over] > and got an answer but the solution is quite complicated hence this ticket. > > In my Kafka Streams application, I have a task that sets up a scheduled (by > the wall time) punctuator. The punctuator iterates over the entries of a > store and does something with them. Like this: > {code:java} > var store = context().getStateStore("MyStore"); > var iter = store.all(); > while (iter.hasNext()) { >var entry = iter.next(); >// ... do something with the entry > } > // Print a summary (now): N entries processed > // Print a summary (wish): N entries processed in partition P > {code} > Is it possible to find out which partition the punctuator operates on? The > java docs for {{ProcessorContext.partition()}} states that this method > returns {{-1}} within punctuators. > I've read [Kafka Streams: Punctuate vs > Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process] > and the answers there. I can understand that a task is, in general, not tied > to a particular partition. But an iterator should be tied IMO. > How can I find out the partition? > Or is my assumption that a particular instance of a store iterator is tied to > a partion wrong? > What I need it for: I'd like to include the partition number in some log > messages. For now, I have several nearly identical log messages stating that > the punctuator does this and that. In order to make those messages "unique" > I'd like to include the partition number into them. > Since I'm working with a single store here (which might be partitioned), I > assume that every single execution of the punctuator is bound to a single > partition of that store. > > It would be cool if there were a method {{iterator.partition}} (or similar) > to get this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
junrao commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579496903 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fina
[jira] [Comment Edited] (KAFKA-12328) Expose TaskId partition number
[ https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287372#comment-17287372 ] fml2 edited comment on KAFKA-12328 at 2/19/21, 9:50 PM: {quote}ie, the supplier must return a new object each time it's invoked {quote} Yes, that must be the error! I return the same instance in each supplier invokation. The docs does not express it clearly enough IMO that a NEW instance must be returned each time. Could you please elaborate on this? Why is it required that the supplier must return a new instance on each invokation? If I return the same instance which is thread safe (it gets all the thread specific informations from the context) then a single instance would suffice. The context should be a proxy object in this case. I.e. it would be a "static" (not chaning) value within the transformer/processor but would internally return the data specific to the particular task in each call. Similar to proxy objects in Spring. But, apparently, this is not how kafka is implemented now so that I have to do this myself, right? was (Author: fml2): {quote}ie, the supplier must return a new object each time it's invoked {quote} Yes, that must be the error! I return the same instance in each supplier invokation. The docs does not express it clearly enough IMO that a NEW instance must be returned each time. > Expose TaskId partition number > -- > > Key: KAFKA-12328 > URL: https://issues.apache.org/jira/browse/KAFKA-12328 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: fml2 >Priority: Major > Labels: needs-kip > > This question was posted [on > stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over] > and got an answer but the solution is quite complicated hence this ticket. > > In my Kafka Streams application, I have a task that sets up a scheduled (by > the wall time) punctuator. The punctuator iterates over the entries of a > store and does something with them. Like this: > {code:java} > var store = context().getStateStore("MyStore"); > var iter = store.all(); > while (iter.hasNext()) { >var entry = iter.next(); >// ... do something with the entry > } > // Print a summary (now): N entries processed > // Print a summary (wish): N entries processed in partition P > {code} > Is it possible to find out which partition the punctuator operates on? The > java docs for {{ProcessorContext.partition()}} states that this method > returns {{-1}} within punctuators. > I've read [Kafka Streams: Punctuate vs > Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process] > and the answers there. I can understand that a task is, in general, not tied > to a particular partition. But an iterator should be tied IMO. > How can I find out the partition? > Or is my assumption that a particular instance of a store iterator is tied to > a partion wrong? > What I need it for: I'd like to include the partition number in some log > messages. For now, I have several nearly identical log messages stating that > the punctuator does this and that. In order to make those messages "unique" > I'd like to include the partition number into them. > Since I'm working with a single store here (which might be partitioned), I > assume that every single execution of the punctuator is bound to a single > partition of that store. > > It would be cool if there were a method {{iterator.partition}} (or similar) > to get this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9270) KafkaStream crash on offset commit failure
[ https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-9270. Resolution: Fixed > KafkaStream crash on offset commit failure > -- > > Key: KAFKA-9270 > URL: https://issues.apache.org/jira/browse/KAFKA-9270 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Rohan Kulkarni >Priority: Critical > > On our Production server we intermittently observe Kafka Streams get crashed > with TimeoutException while committing offset. The only workaround seems to > be restarting the application which is not a desirable solution for a > production environment. > > While have already implemented ProductionExceptionHandler which does not > seems to address this. > > Please provide a fix for this or a viable workaround. > > +Application side logs:+ > 2019-11-17 08:28:48.055 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373] > - stream-thread > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to > commit stream task 0_1 due to the following error:* > *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}} > > 2019-11-17 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 > 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager > MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: > HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions > = [], controller = null) Active tasks: Running: Suspended: Restoring: New: > Standby tasks: Running: Suspended: Restoring: New: > org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}} > > +Kafka broker logs:+ > [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f* > (org.apache.zookeeper.ClientCnxn) > [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > > Regards, > Rohan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9274) Gracefully handle timeout exceptions on Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-9274. Fix Version/s: 2.8.0 Resolution: Fixed > Gracefully handle timeout exceptions on Kafka Streams > - > > Key: KAFKA-9274 > URL: https://issues.apache.org/jira/browse/KAFKA-9274 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 2.8.0 > > > Right now streams don't treat timeout exception as retriable in general by > throwing it to the application level. If not handled by the user, this would > kill the stream thread unfortunately. > In fact, timeouts happen mostly due to network issue or server side > unavailability. Hard failure on client seems to be an over-kill. > We would like to discuss what's the best practice to handle timeout > exceptions on Streams. The current state is still brainstorming and > consolidate all the cases that contain timeout exception within this ticket. > This ticket is backed by KIP-572: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax merged pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out
mjsax merged pull request #10072: URL: https://github.com/apache/kafka/pull/10072 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12328) Expose TaskId partition number
[ https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287375#comment-17287375 ] Matthias J. Sax commented on KAFKA-12328: - We actually just updated the JavaDocs for the upcoming 2.8.0 release: https://issues.apache.org/jira/browse/KAFKA-10036 In general, the ideas of the "supplier pattern" is _inherently_ to return a new object each time it's invoked. Otherwise, the API could be simpler, ie, you could just pass a `Processor` object directly (instead of a supplier). But it seem that many people don't know about the "supplier pattern"... > Expose TaskId partition number > -- > > Key: KAFKA-12328 > URL: https://issues.apache.org/jira/browse/KAFKA-12328 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: fml2 >Priority: Major > Labels: needs-kip > > This question was posted [on > stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over] > and got an answer but the solution is quite complicated hence this ticket. > > In my Kafka Streams application, I have a task that sets up a scheduled (by > the wall time) punctuator. The punctuator iterates over the entries of a > store and does something with them. Like this: > {code:java} > var store = context().getStateStore("MyStore"); > var iter = store.all(); > while (iter.hasNext()) { >var entry = iter.next(); >// ... do something with the entry > } > // Print a summary (now): N entries processed > // Print a summary (wish): N entries processed in partition P > {code} > Is it possible to find out which partition the punctuator operates on? The > java docs for {{ProcessorContext.partition()}} states that this method > returns {{-1}} within punctuators. > I've read [Kafka Streams: Punctuate vs > Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process] > and the answers there. I can understand that a task is, in general, not tied > to a particular partition. But an iterator should be tied IMO. > How can I find out the partition? > Or is my assumption that a particular instance of a store iterator is tied to > a partion wrong? > What I need it for: I'd like to include the partition number in some log > messages. For now, I have several nearly identical log messages stating that > the punctuator does this and that. In order to make those messages "unique" > I'd like to include the partition number into them. > Since I'm working with a single store here (which might be partitioned), I > assume that every single execution of the punctuator is bound to a single > partition of that store. > > It would be cool if there were a method {{iterator.partition}} (or similar) > to get this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on a change in pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…
kkonstantine commented on a change in pull request #10152: URL: https://github.com/apache/kafka/pull/10152#discussion_r579485651 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java ## @@ -82,6 +82,7 @@ public long deadline() { public static void handleMetadataErrors(MetadataResponse response) { for (TopicMetadata tm : response.topicMetadata()) { +if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); Review comment: nit: a stylistic observation is that this call is exactly the same as the call below. Yet it's written differently. Styles differ, usually not too much, from module to module. I think it's good to keep existing styles to help with readability when the changes don't require a greater change. ```suggestion if (shouldRefreshMetadata(tm.error())) { throw tm.error().exception(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…
kkonstantine commented on a change in pull request #10152: URL: https://github.com/apache/kafka/pull/10152#discussion_r579485651 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java ## @@ -82,6 +82,7 @@ public long deadline() { public static void handleMetadataErrors(MetadataResponse response) { for (TopicMetadata tm : response.topicMetadata()) { +if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); Review comment: nit: a stylistic observation is that this call is exactly the same as the call below. Yet it's written differently. Styles differ, usually not too much, from module to module. I think it's good to keep existing styles to help with readability and when the changes don't require a greater change. ```suggestion if (shouldRefreshMetadata(tm.error())) { throw tm.error().exception(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…
kkonstantine commented on a change in pull request #10152: URL: https://github.com/apache/kafka/pull/10152#discussion_r579485651 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java ## @@ -82,6 +82,7 @@ public long deadline() { public static void handleMetadataErrors(MetadataResponse response) { for (TopicMetadata tm : response.topicMetadata()) { +if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); Review comment: nit: a stylistic observation is that this call is exactly the same as the call below. Yet it's written differently. Styles differ, usually not too much, from module to module. I think it's good to keep existing styles to help with readability and when the changes don't require a greater change. ```suggestion if (shouldRefreshMetadata(tm.error())) { throw tm.error().exception(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12328) Expose TaskId partition number
[ https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287372#comment-17287372 ] fml2 commented on KAFKA-12328: -- {quote}ie, the supplier must return a new object each time it's invoked {quote} Yes, that must be the error! I return the same instance in each supplier invokation. The docs does not express it clearly enough IMO that a NEW instance must be returned each time. > Expose TaskId partition number > -- > > Key: KAFKA-12328 > URL: https://issues.apache.org/jira/browse/KAFKA-12328 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: fml2 >Priority: Major > Labels: needs-kip > > This question was posted [on > stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over] > and got an answer but the solution is quite complicated hence this ticket. > > In my Kafka Streams application, I have a task that sets up a scheduled (by > the wall time) punctuator. The punctuator iterates over the entries of a > store and does something with them. Like this: > {code:java} > var store = context().getStateStore("MyStore"); > var iter = store.all(); > while (iter.hasNext()) { >var entry = iter.next(); >// ... do something with the entry > } > // Print a summary (now): N entries processed > // Print a summary (wish): N entries processed in partition P > {code} > Is it possible to find out which partition the punctuator operates on? The > java docs for {{ProcessorContext.partition()}} states that this method > returns {{-1}} within punctuators. > I've read [Kafka Streams: Punctuate vs > Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process] > and the answers there. I can understand that a task is, in general, not tied > to a particular partition. But an iterator should be tied IMO. > How can I find out the partition? > Or is my assumption that a particular instance of a store iterator is tied to > a partion wrong? > What I need it for: I'd like to include the partition number in some log > messages. For now, I have several nearly identical log messages stating that > the punctuator does this and that. In order to make those messages "unique" > I'd like to include the partition number into them. > Since I'm working with a single store here (which might be partitioned), I > assume that every single execution of the punctuator is bound to a single > partition of that store. > > It would be cool if there were a method {{iterator.partition}} (or similar) > to get this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12169) Consumer can not know paritions change when client leader restart with static membership protocol
[ https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287357#comment-17287357 ] Boyang Chen commented on KAFKA-12169: - [~guozhang] One question regarding T1, when we receive the assignment, do we validate its total partitions against our own metadata? If so, the leader should attempt a rejoin IMHO. > Consumer can not know paritions change when client leader restart with static > membership protocol > - > > Key: KAFKA-12169 > URL: https://issues.apache.org/jira/browse/KAFKA-12169 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.5.1, 2.6.1 >Reporter: zou shengfu >Priority: Major > Labels: bug > > Background: > Kafka consumer services run with static membership and cooperative rebalance > protocol on kubernetes, and services often restart because of operation. When > we added partitions from 1000 to 2000 for the topic, client leader restart > with unknown member id at the same time, we found the consumers do not > tigger rebalance and still consume 1000 paritions > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579464755 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -397,6 +397,9 @@ class BrokerServer( info("shutting down") if (config.controlledShutdownEnable) { +// Shut down the broker metadata listener, so that we don't get added to any +// more ISRs. +brokerMetadataListener.beginShutdown() Review comment: It does need to be because there are some paths through the code that don't go through here. In general calling `beginShutdown` or `close` multiple times is harmless-- only the first time has an effect. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch merged pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException
rhauch merged pull request #10158: URL: https://github.com/apache/kafka/pull/10158 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579463797 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fin
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579463797 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fin
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579462462 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fin
[GitHub] [kafka] hachikuji commented on pull request #10157: MINOR: Raft request thread should discover api versions
hachikuji commented on pull request #10157: URL: https://github.com/apache/kafka/pull/10157#issuecomment-782330736 We are missing the handling of authentication and unsupported version exceptions in BrokerToControllerChannelManager. I am working on the test cases now and will submit an update shortly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579458466 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fin
[GitHub] [kafka] ableegoldman commented on pull request #10151: MINOR: Correct warning
ableegoldman commented on pull request #10151: URL: https://github.com/apache/kafka/pull/10151#issuecomment-782323389 Merged to trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10151: MINOR: Correct warning
ableegoldman merged pull request #10151: URL: https://github.com/apache/kafka/pull/10151 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12169) Consumer can not know paritions change when client leader restart with static membership protocol
[ https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12169: -- Labels: bug (was: ) > Consumer can not know paritions change when client leader restart with static > membership protocol > - > > Key: KAFKA-12169 > URL: https://issues.apache.org/jira/browse/KAFKA-12169 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.5.1, 2.6.1 >Reporter: zou shengfu >Priority: Major > Labels: bug > > Background: > Kafka consumer services run with static membership and cooperative rebalance > protocol on kubernetes, and services often restart because of operation. When > we added partitions from 1000 to 2000 for the topic, client leader restart > with unknown member id at the same time, we found the consumers do not > tigger rebalance and still consume 1000 paritions > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10151: MINOR: Correct warning
ableegoldman commented on pull request #10151: URL: https://github.com/apache/kafka/pull/10151#issuecomment-782322194 Some unrelated test failures: ``` Build / JDK 15 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining Build / JDK 8 / kafka.admin.TopicCommandWithAdminClientTest.testDescribeUnderReplicatedPartitions() Build / JDK 8 / org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10149: AbstractCoordinator should log with its subclass
chia7712 commented on pull request #10149: URL: https://github.com/apache/kafka/pull/10149#issuecomment-782314292 > Or are you suggesting we apply this change more broadly? Yep, I feel this change can be applied more broadly :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter
mjsax commented on a change in pull request #10042: URL: https://github.com/apache/kafka/pull/10042#discussion_r579446056 ## File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java ## @@ -247,6 +268,27 @@ public void shouldDetermineInternalTopicBasedOnTopicName1() { assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic")); } +@Test +public void emptyPartitionsAreCorrectlyHandledWhenResettingByDateAndTime() { +final MockConsumer emptyConsumer = new EmptyPartitionConsumer<>(OffsetResetStrategy.EARLIEST); +emptyConsumer.assign(Collections.singletonList(topicPartition)); + +final Map endOffsets = new HashMap<>(); +endOffsets.put(topicPartition, 0L); +emptyConsumer.updateEndOffsets(endOffsets); + +final Map beginningOffsets = new HashMap<>(); +beginningOffsets.put(topicPartition, 0L); +emptyConsumer.updateBeginningOffsets(beginningOffsets); + +final long yesterdayTimestamp = Instant.now().minus(Duration.ofDays(1)).toEpochMilli(); + +streamsResetter.resetToDatetime(emptyConsumer, inputTopicPartitions, yesterdayTimestamp); + +final ConsumerRecords records = emptyConsumer.poll(Duration.ofMillis(500)); +assertEquals(0, records.count()); Review comment: Some comment as above. ## File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java ## @@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { assertEquals(3, records.count()); } +@Test +public void testResetToSpecificOffsetWhenPartitionIsEmpty() { +final MockConsumer emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); +emptyConsumer.assign(Collections.singletonList(topicPartition)); + +final Map endOffsets = new HashMap<>(); +endOffsets.put(topicPartition, 0L); +emptyConsumer.updateEndOffsets(endOffsets); + +final Map beginningOffsets = new HashMap<>(); +beginningOffsets.put(topicPartition, 0L); +emptyConsumer.updateBeginningOffsets(beginningOffsets); + +streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L); + +final ConsumerRecords records = emptyConsumer.poll(Duration.ofMillis(500)); +assertEquals(0, records.count()); Review comment: Not sure if I understand this test. As we use a `MockConsumer` and we never call `addRecord()` this condition should be `true` independent of `StreamsResetter`. Should we not rather verify if `streamsResetter` did _commit_ offsets as expected? ## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ## @@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer client, final Map topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes); for (final TopicPartition topicPartition : inputTopicPartitions) { -client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset()); +final Optional partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition)) +.map(OffsetAndTimestamp::offset) +.filter(offset -> offset != ListOffsetsResponse.UNKNOWN_OFFSET); +if (partitionOffset.isPresent()) { +client.seek(topicPartition, partitionOffset.get()); +} else { +client.seekToEnd(Collections.singletonList(topicPartition)); +System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() + +" is empty, without a committed record. Falling back to latest known offset."); Review comment: > without a committed record Not sure what this means? Does `is empty` need any clarification? ## File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java ## @@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { assertEquals(3, records.count()); } +@Test +public void testResetToSpecificOffsetWhenPartitionIsEmpty() { Review comment: This test seems to be orthogonal to the fix. Was just wondering why we add it and what it's purpose is? It's always good to close testing gaps, just not sure if I understand what this test really tests? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out
ableegoldman commented on a change in pull request #10072: URL: https://github.com/apache/kafka/pull/10072#discussion_r579430512 ## File path: docs/streams/upgrade-guide.html ## @@ -127,6 +127,15 @@ Streams API into the constructor, it is no longer required to set mandatory configuration parameters (cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument";>KIP-680). + +Kafka Streams is now handling TimeoutException thrown by the consumer, producer, and admin client. +If a timeout occurs on a task, Kafka Streams moves to the next task and retries to make progress on the failed +task in the next iteration. +To bound how long Kafka Streams retries a task, you can set task.timeout.ms (default is 5 minutes). +If a task does not make progress within the specified task timeout (the timeout it tracked on a per-task basis) Review comment: nit: the timeout _is_ tracked on a per-task basis ## File path: docs/streams/upgrade-guide.html ## @@ -127,6 +127,15 @@ Streams API into the constructor, it is no longer required to set mandatory configuration parameters (cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument";>KIP-680). + +Kafka Streams is now handling TimeoutException thrown by the consumer, producer, and admin client. +If a timeout occurs on a task, Kafka Streams moves to the next task and retries to make progress on the failed +task in the next iteration. +To bound how long Kafka Streams retries a task, you can set task.timeout.ms (default is 5 minutes). +If a task does not make progress within the specified task timeout (the timeout it tracked on a per-task basis) Review comment: or better yet ```suggestion If a task does not make progress within the specified task timeout, which is tracked on a per-task basis, ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
junrao commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579438984 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fina
[GitHub] [kafka] gardnervickers closed pull request #6742: Improve performance of checkpointHighWatermarks, patch 2/2
gardnervickers closed pull request #6742: URL: https://github.com/apache/kafka/pull/6742 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12347) Improve Kafka Streams ability to track progress
[ https://issues.apache.org/jira/browse/KAFKA-12347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12347: Description: Add metrics to track records being consumed fully and to tell if tasks are idling. This will allow users of streams to build uptime metrics around streams with less difficulty. KIP-715: https://cwiki.apache.org/confluence/display/KAFKA/KIP-715%3A+Expose+Committed+offset+in+streams was:Add metrics to track records being consumed fully and to tell if tasks are idling. This will allow users of streams to build uptime metrics around streams with less difficulty. > Improve Kafka Streams ability to track progress > --- > > Key: KAFKA-12347 > URL: https://issues.apache.org/jira/browse/KAFKA-12347 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Labels: kip > Fix For: 3.0.0 > > > Add metrics to track records being consumed fully and to tell if tasks are > idling. This will allow users of streams to build uptime metrics around > streams with less difficulty. > KIP-715: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-715%3A+Expose+Committed+offset+in+streams > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12347) Improve Kafka Streams ability to track progress
[ https://issues.apache.org/jira/browse/KAFKA-12347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12347: Labels: kip (was: need-kip) > Improve Kafka Streams ability to track progress > --- > > Key: KAFKA-12347 > URL: https://issues.apache.org/jira/browse/KAFKA-12347 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Labels: kip > Fix For: 3.0.0 > > > Add metrics to track records being consumed fully and to tell if tasks are > idling. This will allow users of streams to build uptime metrics around > streams with less difficulty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12328) Expose TaskId partition number
[ https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287324#comment-17287324 ] Matthias J. Sax edited comment on KAFKA-12328 at 2/19/21, 7:44 PM: --- If this happens in "steady state" it would be very suspicious. You monitor / verify the task assignment from the logs, that should give you more intel. The other thing coming to my mind is, that it could be programming error: You cannot share a single `Processor` instance (ie, the supplier must return a new object each time it's invoked). You should also not have and shared state (like `static` variable). If you share the same processor instance for two tasks, the tasks are not isolated any longer (as they should be) and it could lead to what you observe. was (Author: mjsax): If this happens in "steady state" it would be very suspicious. You monitor / verify the task assignment from the logs, that should give you more intel. > Expose TaskId partition number > -- > > Key: KAFKA-12328 > URL: https://issues.apache.org/jira/browse/KAFKA-12328 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: fml2 >Priority: Major > Labels: needs-kip > > This question was posted [on > stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over] > and got an answer but the solution is quite complicated hence this ticket. > > In my Kafka Streams application, I have a task that sets up a scheduled (by > the wall time) punctuator. The punctuator iterates over the entries of a > store and does something with them. Like this: > {code:java} > var store = context().getStateStore("MyStore"); > var iter = store.all(); > while (iter.hasNext()) { >var entry = iter.next(); >// ... do something with the entry > } > // Print a summary (now): N entries processed > // Print a summary (wish): N entries processed in partition P > {code} > Is it possible to find out which partition the punctuator operates on? The > java docs for {{ProcessorContext.partition()}} states that this method > returns {{-1}} within punctuators. > I've read [Kafka Streams: Punctuate vs > Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process] > and the answers there. I can understand that a task is, in general, not tied > to a particular partition. But an iterator should be tied IMO. > How can I find out the partition? > Or is my assumption that a particular instance of a store iterator is tied to > a partion wrong? > What I need it for: I'd like to include the partition number in some log > messages. For now, I have several nearly identical log messages stating that > the punctuator does this and that. In order to make those messages "unique" > I'd like to include the partition number into them. > Since I'm working with a single store here (which might be partitioned), I > assume that every single execution of the punctuator is bound to a single > partition of that store. > > It would be cool if there were a method {{iterator.partition}} (or similar) > to get this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell
cmccabe commented on a change in pull request #10094: URL: https://github.com/apache/kafka/pull/10094#discussion_r579433300 ## File path: settings.gradle ## @@ -29,6 +29,7 @@ include 'clients', 'log4j-appender', 'metadata', 'raft', +'shell', Review comment: It would be nice to have it as just "shell" until we get another shell This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12328) Expose TaskId partition number
[ https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287324#comment-17287324 ] Matthias J. Sax commented on KAFKA-12328: - If this happens in "steady state" it would be very suspicious. You monitor / verify the task assignment from the logs, that should give you more intel. > Expose TaskId partition number > -- > > Key: KAFKA-12328 > URL: https://issues.apache.org/jira/browse/KAFKA-12328 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: fml2 >Priority: Major > Labels: needs-kip > > This question was posted [on > stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over] > and got an answer but the solution is quite complicated hence this ticket. > > In my Kafka Streams application, I have a task that sets up a scheduled (by > the wall time) punctuator. The punctuator iterates over the entries of a > store and does something with them. Like this: > {code:java} > var store = context().getStateStore("MyStore"); > var iter = store.all(); > while (iter.hasNext()) { >var entry = iter.next(); >// ... do something with the entry > } > // Print a summary (now): N entries processed > // Print a summary (wish): N entries processed in partition P > {code} > Is it possible to find out which partition the punctuator operates on? The > java docs for {{ProcessorContext.partition()}} states that this method > returns {{-1}} within punctuators. > I've read [Kafka Streams: Punctuate vs > Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process] > and the answers there. I can understand that a task is, in general, not tied > to a particular partition. But an iterator should be tied IMO. > How can I find out the partition? > Or is my assumption that a particular instance of a store iterator is tied to > a partion wrong? > What I need it for: I'd like to include the partition number in some log > messages. For now, I have several nearly identical log messages stating that > the punctuator does this and that. In order to make those messages "unique" > I'd like to include the partition number into them. > Since I'm working with a single store here (which might be partitioned), I > assume that every single execution of the punctuator is bound to a single > partition of that store. > > It would be cool if there were a method {{iterator.partition}} (or similar) > to get this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell
cmccabe commented on a change in pull request #10094: URL: https://github.com/apache/kafka/pull/10094#discussion_r579432108 ## File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java ## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metalog; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +/** + * The LocalLogManager is a test implementation that relies on the contents of memory. + */ +public final class LocalLogManager implements MetaLogManager, AutoCloseable { Review comment: Yes, this should not be present twice. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell
cmccabe commented on a change in pull request #10094: URL: https://github.com/apache/kafka/pull/10094#discussion_r579431376 ## File path: core/src/main/scala/kafka/server/Server.scala ## @@ -29,6 +29,7 @@ trait Server { } object Server { + val metadataTopicName = "@metadata" Review comment: yes, let's do that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287316#comment-17287316 ] Matthias J. Sax edited comment on KAFKA-7499 at 2/19/21, 7:34 PM: -- [~aniket0710] -- I think [~jbfletch] never got to open a PR for it. Maybe [~high.lee] wants to take it over (asked about it above). was (Author: mjsax): [~aniket0710] -- I think [~jbfletch] never got to open a PR for it. Maybe [~high.lee] wants to take it over (he asked about it above). > Extend ProductionExceptionHandler to cover serialization exceptions > --- > > Key: KAFKA-7499 > URL: https://issues.apache.org/jira/browse/KAFKA-7499 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: beginner, kip, newbie > > In > [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce], > an exception handler for the write path was introduced. This exception > handler covers exception that are raised in the producer callback. > However, serialization happens before the data is handed to the producer with > Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair > types. > Thus, we might want to extend the ProductionExceptionHandler to cover > serialization exception, too, to skip over corrupted output messages. An > example could be a "String" message that contains invalid JSON and should be > serialized as JSON. > KIP-399 (not voted yet; feel free to pick it up): > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287316#comment-17287316 ] Matthias J. Sax commented on KAFKA-7499: [~aniket0710] -- I think [~jbfletch] never got to open a PR for it. Maybe [~high.lee] wants to take it over (he asked about it above). > Extend ProductionExceptionHandler to cover serialization exceptions > --- > > Key: KAFKA-7499 > URL: https://issues.apache.org/jira/browse/KAFKA-7499 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: beginner, kip, newbie > > In > [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce], > an exception handler for the write path was introduced. This exception > handler covers exception that are raised in the producer callback. > However, serialization happens before the data is handed to the producer with > Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair > types. > Thus, we might want to extend the ProductionExceptionHandler to cover > serialization exception, too, to skip over corrupted output messages. An > example could be a "String" message that contains invalid JSON and should be > serialized as JSON. > KIP-399 (not voted yet; feel free to pick it up): > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jbfletch reassigned KAFKA-7499: --- Assignee: (was: jbfletch) > Extend ProductionExceptionHandler to cover serialization exceptions > --- > > Key: KAFKA-7499 > URL: https://issues.apache.org/jira/browse/KAFKA-7499 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: beginner, kip, newbie > > In > [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce], > an exception handler for the write path was introduced. This exception > handler covers exception that are raised in the producer callback. > However, serialization happens before the data is handed to the producer with > Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair > types. > Thus, we might want to extend the ProductionExceptionHandler to cover > serialization exception, too, to skip over corrupted output messages. An > example could be a "String" message that contains invalid JSON and should be > serialized as JSON. > KIP-399 (not voted yet; feel free to pick it up): > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
junrao commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579411851 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private fina
[GitHub] [kafka] bbejeck commented on pull request #10150: KAFKA-3745: Add access to read-only key in value joiner
bbejeck commented on pull request #10150: URL: https://github.com/apache/kafka/pull/10150#issuecomment-782288891 Java 11 passed Java 8 and Java 15 failed with ``` JDK 15 org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers JDK 8 org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers kafka.api.ConsumerBounceTest.testClose() org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12332) Error partitions from topics with invalid IDs in LISR requests
[ https://issues.apache.org/jira/browse/KAFKA-12332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12332. - Resolution: Fixed > Error partitions from topics with invalid IDs in LISR requests > -- > > Key: KAFKA-12332 > URL: https://issues.apache.org/jira/browse/KAFKA-12332 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > In a situation where topics are deleted and recreated in a short amount of > time, LeaderAndIsr requests can contain topics with invalid IDs, but correct > epochs. In this case, we will incorrectly handle the request and simply log > an error message on the broker. It will be more useful to not handle the > request for the partition with an invalid ID and send and error back to the > controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests
hachikuji merged pull request #10143: URL: https://github.com/apache/kafka/pull/10143 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #10155: Fix Raft broker restart issue when offset partitions are deferred
cmccabe closed pull request #10155: URL: https://github.com/apache/kafka/pull/10155 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10155: Fix Raft broker restart issue when offset partitions are deferred
cmccabe commented on pull request #10155: URL: https://github.com/apache/kafka/pull/10155#issuecomment-782280589 committed to trunk and 2.8 after running tests locally This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10105: Kip500 full
rondagostino commented on a change in pull request #10105: URL: https://github.com/apache/kafka/pull/10105#discussion_r579407572 ## File path: build.gradle ## @@ -1029,6 +1029,7 @@ project(':metadata') { compile project(':clients') compile libs.jacksonDatabind compile libs.jacksonJDK8Datatypes +compile libs.metrics Review comment: > file a JIRA to discuss whether the metadata module should use Kafka metrics or Yammer metrics. https://issues.apache.org/jira/browse/KAFKA-12348 cc: @cmccabe This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12348) The metadata module currently uses Yammer metrics. Should it uses Kafka metrics instead?
Ron Dagostino created KAFKA-12348: - Summary: The metadata module currently uses Yammer metrics. Should it uses Kafka metrics instead? Key: KAFKA-12348 URL: https://issues.apache.org/jira/browse/KAFKA-12348 Project: Kafka Issue Type: Task Components: metrics Affects Versions: 3.0.0, 2.8.0 Reporter: Ron Dagostino -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException
kkonstantine commented on a change in pull request #10158: URL: https://github.com/apache/kafka/pull/10158#discussion_r579403166 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -366,6 +346,42 @@ private void readToLogEnd() { } } +// Visible for testing +Map readEndOffsets(Set assignment) { +log.trace("Reading to end of offset log"); + +Map endOffsets; +// Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. +// That is because it's possible that the consumer is already blocked waiting for new records to appear, when +// the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least +// one more record becomes available, meaning we can't even check whether we're at the end offset. +// Since all we're trying to do here is get the end offset, we should use the supplied admin client +// (if available) +// (which prevents 'consumer.endOffsets(...)' +// from + +// Deprecated constructors do not provide an admin supplier, so the admin is potentially null. +if (useAdminForListOffsets) { +// Use the admin client to immediately find the end offsets for the assigned topic partitions. +// Unlike using the consumer +try { +return admin.endOffsets(assignment); +} catch (UnsupportedVersionException e) { +// This may happen with really old brokers that don't support the auto topic creation +// field in metadata requests +log.debug("Reading to end of log offsets with consumer since admin client is unsupported: {}", e.getMessage()); +useAdminForListOffsets = false; Review comment: Good point. As long as `SharedTopicAdmin` admin won't be recycled. That's negligible anyways, I was referring to the pattern mainly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException
kkonstantine commented on a change in pull request #10158: URL: https://github.com/apache/kafka/pull/10158#discussion_r579395276 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -161,6 +164,7 @@ public void start() { // Create the topic admin client and initialize the topic ... admin = topicAdminSupplier.get(); // may be null +useAdminForListOffsets = admin != null; Review comment: My main observation here is that keeping initialization in the constructor allows us to declare read only fields as `final`. It also offers better exception safety, in the sense that when an object is constructed (the call to its constructor succeeds) then we know that more or less we have a functional instance of that class. I'm not ignoring the cases where the initialization of variables has to happen at a later time during the call of `start`. But hopefully these cases are minimal, or else `start` ends up being the actual constructor of objects like this one here. Which can be redundant and in some cases more risky. As I mentioned, since `admin` is not immutable here (can be set to `null` later to support connection with older brokers) I'm fine keeping this call in `start`. But I'd still recommend not pushing initializations outside the constructor if they can happen at the time of the object construction (as is the case with `admin`'s initial initialization here). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r579378306 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,875 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; + + +/** + * The ReplicationControlManager is the part of the controller which deals with topics + * and partitions. It is responsible for managing the in-sync replica set and leader + * of each partition, as well as administrative tasks like creating or deleting topics. + */ +public class ReplicationControlManager { +static class TopicControlInfo { +private final Uuid id; +private final TimelineHashMap parts; + +TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) { +this.id = id; +this.parts = new TimelineHashMap<>(snapshotRegistry, 0); +} +} + +static class PartitionControlInfo { +private final int[] replicas; +private final int[] isr; +private final int[] removingReplicas; +private final int[] addingReplicas; +private final int leader; +private final int leaderEpoch; +private final int partitionEpoch; + +