[GitHub] [kafka] chia7712 commented on a change in pull request #10162: DOCS: Update protocol doc for missing data type

2021-02-19 Thread GitBox


chia7712 commented on a change in pull request #10162:
URL: https://github.com/apache/kafka/pull/10162#discussion_r579615450



##
File path: clients/src/main/resources/common/message/README.md
##
@@ -85,6 +87,8 @@ There are several primitive field types available.
 
 * "bytes": binary data.
 
+* "records": record set used in fetch api and fetch snapshot api

Review comment:
   `records` is used by produce data also.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10124: MINOR: apply Utils.isBlank to code base

2021-02-19 Thread GitBox


chia7712 merged pull request #10124:
URL: https://github.com/apache/kafka/pull/10124


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10124: MINOR: apply Utils.isBlank to code base

2021-02-19 Thread GitBox


chia7712 commented on pull request #10124:
URL: https://github.com/apache/kafka/pull/10124#issuecomment-782552393


   `kafka.admin.ListConsumerGroupTest.testListConsumerGroupsWithStates()` 
unrelated error



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10149: AbstractCoordinator should log with its subclass

2021-02-19 Thread GitBox


chia7712 merged pull request #10149:
URL: https://github.com/apache/kafka/pull/10149


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10148: Update KafkaConsumerMetrics.java

2021-02-19 Thread GitBox


chia7712 merged pull request #10148:
URL: https://github.com/apache/kafka/pull/10148


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12335) Upgrade junit from 5.7.0 to 5.7.1

2021-02-19 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12335.

Fix Version/s: 3.0.0
   Resolution: Fixed

> Upgrade junit from 5.7.0 to 5.7.1
> -
>
> Key: KAFKA-12335
> URL: https://issues.apache.org/jira/browse/KAFKA-12335
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 3.0.0
>
>
> junit 5.7.1 release notes: 
> [https://junit.org/junit5/docs/5.7.1/release-notes/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #10145: KAFKA-12335 Upgrade junit from 5.7.0 to 5.7.1

2021-02-19 Thread GitBox


chia7712 merged pull request #10145:
URL: https://github.com/apache/kafka/pull/10145


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10145: KAFKA-12335 Upgrade junit from 5.7.0 to 5.7.1

2021-02-19 Thread GitBox


chia7712 commented on pull request #10145:
URL: https://github.com/apache/kafka/pull/10145#issuecomment-782549839


   `Build / JDK 15 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector`
   #10077
   
   
`org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithDefaultSettings`
   #10152 #10158



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12339) Add retry to admin client's listOffsets

2021-02-19 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-12339:
---
Summary: Add retry to admin client's listOffsets  (was: Starting new 
connector cluster with new internal topics encounters 
UnknownTopicOrPartitionException)

> Add retry to admin client's listOffsets
> ---
>
> Key: KAFKA-12339
> URL: https://issues.apache.org/jira/browse/KAFKA-12339
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.2, 2.8.0, 2.7.1, 2.6.2
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Blocker
>
> After upgrading our connector env to 2.9.0-SNAPSHOT, sometimes the connect 
> cluster encounters following error.
> {quote}Uncaught exception in herder work thread, exiting:  
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:324)
> org.apache.kafka.connect.errors.ConnectException: Error while getting end 
> offsets for topic 'connect-storage-topic-connect-cluster-1'
> at org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:689)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:338)
> at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:195)
> at 
> org.apache.kafka.connect.storage.KafkaStatusBackingStore.start(KafkaStatusBackingStore.java:216)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:129)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:310)
> at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:668)
> ... 10 more
> {quote}
> [https://github.com/apache/kafka/pull/9780] added shared admin to get end 
> offsets. KafkaAdmin#listOffsets does not handle topic-level error, hence the 
> UnknownTopicOrPartitionException on topic-level can obstruct worker from 
> running when the new internal topic is NOT synced to all brokers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

2021-02-19 Thread GitBox


chia7712 commented on pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#issuecomment-782527168


   > Also might be good to note in the description that the error could be 
retriable in a same way as the one on partition metadata and it's the fact that 
it is not retried (here or in the connect worker) that creates the issue.
   
   Should we cherry-pick this patch to other active branches?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

2021-02-19 Thread GitBox


chia7712 commented on pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#issuecomment-782526238


   @rhauch @kkonstantine Thanks for your reviews!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe merged pull request #10070:
URL: https://github.com/apache/kafka/pull/10070


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#issuecomment-782521288


   Thanks, @junrao ! :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10817) Add clusterId validation to Fetch handling

2021-02-19 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287447#comment-17287447
 ] 

dengziming commented on KAFKA-10817:


ClusterId validation is only applied to Fetch requests? should we also apply it 
to BeginQuorum EndQuorum FetchSnapshot?

> Add clusterId validation to Fetch handling
> --
>
> Key: KAFKA-10817
> URL: https://issues.apache.org/jira/browse/KAFKA-10817
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: David Jacot
>Priority: Major
>
> Initially we were unsure how the clusterId would be generated by the cluster 
> after Zookeeper removal, so we did not implement it. It is looking now like 
> we will probably require users to generate it manually prior to starting the 
> cluster. See here for details: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-631%3A+The+Quorum-based+Kafka+Controller#KIP631:TheQuorumbasedKafkaController-Fencing.
>  In this case, we can assume that the clusterId will be provided when 
> instantiating the raft client, so we can add the logic to the request handler 
> to validate it in inbound Fetch requests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579565540



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private final int partitionEpoch;
+
+ 

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579564654



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+/**
+ * A builder class which creates the QuorumController.
+ */
+static public class Builder {
+private final int nodeId;
+private Time time = Time.SYSTEM;
+private String threadNamePrefix = null;
+private LogContext logContext = null;
+private Map configDefs = 
Collections.emptyMap();
+private MetaLogManager logManager = null;
+private Map supportedFeatures = 
Collections.emptyMap();
+private short defaultReplicationFactor = 3;
+private int defaultNumPartitions = 1;
+private ReplicaPlacementPolicy replicaPlacementPolicy =
+new SimpleReplicaPlacementPolicy(new Random());
+private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+public Builder(int nodeId) {
+this.nodeId = nodeId;
+}
+
+public B

[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r578003900



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+/**
+ * A builder class which creates the QuorumController.
+ */
+static public class Builder {
+private final int nodeId;
+private Time time = Time.SYSTEM;
+private String threadNamePrefix = null;
+private LogContext logContext = null;
+private Map configDefs = 
Collections.emptyMap();
+private MetaLogManager logManager = null;
+private Map supportedFeatures = 
Collections.emptyMap();
+private short defaultReplicationFactor = 3;
+private int defaultNumPartitions = 1;
+private ReplicaPlacementPolicy replicaPlacementPolicy =
+new SimpleReplicaPlacementPolicy(new Random());
+private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+public Builder(int nodeId) {
+this.nodeId = nodeId;
+}
+
+public Bu

[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r578065045



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+/**
+ * A builder class which creates the QuorumController.
+ */
+static public class Builder {
+private final int nodeId;
+private Time time = Time.SYSTEM;
+private String threadNamePrefix = null;
+private LogContext logContext = null;
+private Map configDefs = 
Collections.emptyMap();
+private MetaLogManager logManager = null;
+private Map supportedFeatures = 
Collections.emptyMap();
+private short defaultReplicationFactor = 3;
+private int defaultNumPartitions = 1;
+private ReplicaPlacementPolicy replicaPlacementPolicy =
+new SimpleReplicaPlacementPolicy(new Random());
+private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+public Builder(int nodeId) {
+this.nodeId = nodeId;
+}
+
+public Bu

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579564579



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+/**
+ * A builder class which creates the QuorumController.
+ */
+static public class Builder {
+private final int nodeId;
+private Time time = Time.SYSTEM;
+private String threadNamePrefix = null;
+private LogContext logContext = null;
+private Map configDefs = 
Collections.emptyMap();
+private MetaLogManager logManager = null;
+private Map supportedFeatures = 
Collections.emptyMap();
+private short defaultReplicationFactor = 3;
+private int defaultNumPartitions = 1;
+private ReplicaPlacementPolicy replicaPlacementPolicy =
+new SimpleReplicaPlacementPolicy(new Random());
+private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+public Builder(int nodeId) {
+this.nodeId = nodeId;
+}
+
+public B

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579564268



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.FeatureMap;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+
+public class FeatureControlManager {
+/**
+ * The features supported by this controller's software.
+ */
+private final Map supportedFeatures;
+
+/**
+ * Maps feature names to finalized version ranges.
+ */
+private final TimelineHashMap finalizedVersions;
+
+/**
+ * The latest feature epoch.
+ */
+private final TimelineHashSet epoch;
+
+FeatureControlManager(Map supportedFeatures,
+  SnapshotRegistry snapshotRegistry) {
+this.supportedFeatures = supportedFeatures;
+this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
+this.epoch = new TimelineHashSet<>(snapshotRegistry, 0);
+}
+
+ControllerResult> updateFeatures(
+Map updates, Set downgradeables,
+Map> brokerFeatures) {
+TreeMap results = new TreeMap<>();
+List records = new ArrayList<>();
+for (Entry entry : updates.entrySet()) {
+results.put(entry.getKey(), updateFeature(entry.getKey(), 
entry.getValue(),
+downgradeables.contains(entry.getKey()), brokerFeatures, 
records));
+}
+return new ControllerResult<>(records, results);
+}
+
+private ApiError updateFeature(String featureName,
+   VersionRange newRange,
+   boolean downgradeable,
+   Map> 
brokerFeatures,
+   List records) {
+if (newRange.min() <= 0) {
+return new ApiError(Errors.INVALID_UPDATE_VERSION,
+"The lower value for the new range cannot be less than 1.");
+}
+if (newRange.max() <= 0) {
+return new ApiError(Errors.INVALID_UPDATE_VERSION,
+"The upper value for the new range cannot be less than 1.");
+}
+VersionRange localRange = supportedFeatures.get(featureName);

Review comment:
   let's revisit this after 2.8





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9672) Dead brokers in ISR cause isr-expiration to fail with exception

2021-02-19 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-9672.

Fix Version/s: 3.0.0
   Resolution: Fixed

merged the PR to trunk

> Dead brokers in ISR cause isr-expiration to fail with exception
> ---
>
> Key: KAFKA-9672
> URL: https://issues.apache.org/jira/browse/KAFKA-9672
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0, 2.4.1
>Reporter: Ivan Yurchenko
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
> Fix For: 3.0.0
>
>
> We're running Kafka 2.4 and facing a pretty strange situation.
>  Let's say there were three brokers in the cluster 0, 1, and 2. Then:
>  1. Broker 3 was added.
>  2. Partitions were reassigned from broker 0 to broker 3.
>  3. Broker 0 was shut down (not gracefully) and removed from the cluster.
>  4. We see the following state in ZooKeeper:
> {code:java}
> ls /brokers/ids
> [1, 2, 3]
> get /brokers/topics/foo
> {"version":2,"partitions":{"0":[2,1,3]},"adding_replicas":{},"removing_replicas":{}}
> get /brokers/topics/foo/partitions/0/state
> {"controller_epoch":123,"leader":1,"version":1,"leader_epoch":42,"isr":[0,2,3,1]}
> {code}
> It means, the dead broker 0 remains in the partitions's ISR. A big share of 
> the partitions in the cluster have this issue.
> This is actually causing an errors:
> {code:java}
> Uncaught exception in scheduled task 'isr-expiration' 
> (kafka.utils.KafkaScheduler)
> org.apache.kafka.common.errors.ReplicaNotAvailableException: Replica with id 
> 12 is not available on broker 17
> {code}
> It means that effectively {{isr-expiration}} task is not working any more.
> I have a suspicion that this was introduced by [this commit (line 
> selected)|https://github.com/apache/kafka/commit/57baa4079d9fc14103411f790b9a025c9f2146a4#diff-5450baca03f57b9f2030f93a480e6969R856]
> Unfortunately, I haven't been able to reproduce this in isolation.
> Any hints about how to reproduce (so I can write a patch) or mitigate the 
> issue on a running cluster are welcome.
> Generally, I assume that not throwing {{ReplicaNotAvailableException}} on a 
> dead (i.e. non-existent) broker, considering them out-of-sync and removing 
> from the ISR should fix the problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

2021-02-19 Thread GitBox


junrao merged pull request #9631:
URL: https://github.com/apache/kafka/pull/9631


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579553251



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+/**
+ * A special value used to represent the leader for a partition with no 
leader. 
+ */
+public static final int NO_LEADER = -1;
+
+/**
+ * A special value used to represent a PartitionChangeRecord that does not 
change the
+ * partition leader.
+ */
+public static final int NO_LEADER_CHANGE = -2;
+
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(sn

[jira] [Updated] (KAFKA-12343) Recent change to use SharedTopicAdmin in KakfkaBasedLog fails with AK 0.10.x brokers

2021-02-19 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-12343:
--
Fix Version/s: 2.6.2
   2.7.1
   2.8.0
   2.5.2

> Recent change to use SharedTopicAdmin in KakfkaBasedLog fails with AK 0.10.x 
> brokers
> 
>
> Key: KAFKA-12343
> URL: https://issues.apache.org/jira/browse/KAFKA-12343
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.2, 2.8.0, 2.7.1, 2.6.2
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Blocker
> Fix For: 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
>
> System test failure 
> ([sample|http://confluent-kafka-2-7-system-test-results.s3-us-west-2.amazonaws.com/2021-02-18--001.1613655226--confluentinc--2.7--54952635e5/report.html]):
> {code:java}
> Java.lang.Exception: UnsupportedVersionException: MetadataRequest versions 
> older than 4 don't support the allowAutoTopicCreation field
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:755)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1136)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1301)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1224)
> at java.lang.Thread.run(Thread.java:748)
> [2021-02-16 12:05:11,735] ERROR [Worker clientId=connect-1, 
> groupId=connect-cluster] Uncaught exception in herder work thread, exiting:  
> (org.apache.kafka.connect.runtime.distributed.Di
> stributedHerder)
> org.apache.kafka.connect.errors.ConnectException: API to get the get the end 
> offsets for topic 'connect-offsets' is unsupported on brokers at worker25:9092
> at 
> org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:680)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:338)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:195)
> at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:136)
> at org.apache.kafka.connect.runtime.Worker.start(Worker.java:197)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:128)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:311)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest 
> versions older than 4 don't support the allowAutoTopicCre
> ation field
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:668)
> ... 11 more   {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12343) Recent change to use SharedTopicAdmin in KakfkaBasedLog fails with AK 0.10.x brokers

2021-02-19 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-12343.
---
  Reviewer: Konstantine Karantasis
Resolution: Fixed

> Recent change to use SharedTopicAdmin in KakfkaBasedLog fails with AK 0.10.x 
> brokers
> 
>
> Key: KAFKA-12343
> URL: https://issues.apache.org/jira/browse/KAFKA-12343
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.2, 2.8.0, 2.7.1, 2.6.2
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Blocker
> Fix For: 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
>
> System test failure 
> ([sample|http://confluent-kafka-2-7-system-test-results.s3-us-west-2.amazonaws.com/2021-02-18--001.1613655226--confluentinc--2.7--54952635e5/report.html]):
> {code:java}
> Java.lang.Exception: UnsupportedVersionException: MetadataRequest versions 
> older than 4 don't support the allowAutoTopicCreation field
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:755)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1136)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1301)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1224)
> at java.lang.Thread.run(Thread.java:748)
> [2021-02-16 12:05:11,735] ERROR [Worker clientId=connect-1, 
> groupId=connect-cluster] Uncaught exception in herder work thread, exiting:  
> (org.apache.kafka.connect.runtime.distributed.Di
> stributedHerder)
> org.apache.kafka.connect.errors.ConnectException: API to get the get the end 
> offsets for topic 'connect-offsets' is unsupported on brokers at worker25:9092
> at 
> org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:680)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:338)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:195)
> at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:136)
> at org.apache.kafka.connect.runtime.Worker.start(Worker.java:197)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:128)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:311)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest 
> versions older than 4 don't support the allowAutoTopicCre
> ation field
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:668)
> ... 11 more   {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio edited a comment on pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

2021-02-19 Thread GitBox


jsancio edited a comment on pull request #9631:
URL: https://github.com/apache/kafka/pull/9631#issuecomment-782469265


   @junrao I merged the latest trunk to this PR. Jenkins ran the tests and all 
the failures look unrelated.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

2021-02-19 Thread GitBox


jsancio commented on pull request #9631:
URL: https://github.com/apache/kafka/pull/9631#issuecomment-782469265


   @junrao I merged the latest master to this PR. Jenkins ran the tests and all 
the failures look unrelated.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579547910



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fin

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579547825



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fin

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579547731



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fin

[jira] [Created] (KAFKA-12349) Follow up on PartitionEpoch in KIP-500

2021-02-19 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12349:


 Summary: Follow up on PartitionEpoch in KIP-500
 Key: KAFKA-12349
 URL: https://issues.apache.org/jira/browse/KAFKA-12349
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe
Assignee: Colin McCabe


* Remove the compatibility shim between raft and the kip-500 controller

* standardize on the epoch data type (probably int)

* review partition epoch, leader epoch



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579547404



##
File path: metadata/src/main/resources/common/metadata/PartitionRecord.json
##
@@ -34,6 +34,8 @@
 { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
   "about": "The lead replica, or -1 if there is no leader." },
 { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1",
-  "about": "An epoch that gets incremented each time we change the ISR." }
+  "about": "An epoch that gets incremented each time we change the ISR." },
+{ "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": 
"-1",

Review comment:
   filed KAFKA-12349





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579547098



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+/**
+ * A special value used to represent the leader for a partition with no 
leader. 
+ */
+public static final int NO_LEADER = -1;
+
+/**
+ * A special value used to represent a PartitionChangeRecord that does not 
change the
+ * partition leader.
+ */
+public static final int NO_LEADER_CHANGE = -2;
+
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(s

[jira] [Commented] (KAFKA-12169) Consumer can not know paritions change when client leader restart with static membership protocol

2021-02-19 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287431#comment-17287431
 ] 

Guozhang Wang commented on KAFKA-12169:
---

Inside the AbstractCoordinator:

{code}
public synchronized boolean 
checkAssignmentMatchedSubscription(Collection assignments) {
for (TopicPartition topicPartition : assignments) {
if (this.subscribedPattern != null) {
if 
(!this.subscribedPattern.matcher(topicPartition.topic()).matches()) {
log.info("Assigned partition {} for non-subscribed topic 
regex pattern; subscription pattern is {}",
topicPartition,
this.subscribedPattern);

return false;
}
} else {
if (!this.subscription.contains(topicPartition.topic())) {
log.info("Assigned partition {} for non-subscribed topic; 
subscription is {}", topicPartition, this.subscription);

return false;
}
}
}

return true;
}
{code}

We only check that the topic names are included in the subscription, but do not 
check their num.partitions against the metadata.

> Consumer can not know paritions change when client leader restart with static 
> membership protocol
> -
>
> Key: KAFKA-12169
> URL: https://issues.apache.org/jira/browse/KAFKA-12169
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.5.1, 2.6.1
>Reporter: zou shengfu
>Priority: Major
>  Labels: bug
>
> Background: 
>  Kafka consumer services run with static membership and cooperative rebalance 
> protocol on kubernetes, and services often restart because of operation. When 
> we added partitions from 1000 to 2000 for the topic, client leader restart 
> with unknown member id at the same time, we found  the consumers do not 
> tigger rebalance and still consume 1000 paritions
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-02-19 Thread GitBox


MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579543547



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##
@@ -76,6 +78,25 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
 assertEquals(3, records.count());
 }
 
+@Test
+public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+final MockConsumer emptyConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+final Map endOffsets = new HashMap<>();
+endOffsets.put(topicPartition, 0L);
+emptyConsumer.updateEndOffsets(endOffsets);
+
+final Map beginningOffsets = new HashMap<>();
+beginningOffsets.put(topicPartition, 0L);
+emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 
2L);
+
+final ConsumerRecords records = 
emptyConsumer.poll(Duration.ofMillis(500));
+assertEquals(0, records.count());

Review comment:
   @mjsax I have changed the PR to make sure that the tests verify the 
committed offset. It seems to me, however, that the original test implementer 
intent was to verify the position of the consumer after the method call - not 
if it committed the offset. Specially because the ```position()``` and 
```commitAsync()``` calls happen outside the methods under test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

2021-02-19 Thread GitBox


cmccabe commented on pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#issuecomment-782461539


   ran test manually and committed from command line



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe closed pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

2021-02-19 Thread GitBox


cmccabe closed pull request #10094:
URL: https://github.com/apache/kafka/pull/10094


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579538752



##
File path: metadata/src/main/resources/common/metadata/PartitionRecord.json
##
@@ -34,6 +34,8 @@
 { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
   "about": "The lead replica, or -1 if there is no leader." },
 { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1",
-  "about": "An epoch that gets incremented each time we change the ISR." }
+  "about": "An epoch that gets incremented each time we change the ISR." },
+{ "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": 
"-1",

Review comment:
   Could you file a separate jira to follow up on PartitionEpoch post 2.8?

##
File path: metadata/src/main/resources/common/metadata/PartitionRecord.json
##
@@ -34,6 +34,8 @@
 { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
   "about": "The lead replica, or -1 if there is no leader." },
 { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1",
-  "about": "An epoch that gets incremented each time we change the ISR." }
+  "about": "An epoch that gets incremented each time we change the ISR." },
+{ "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": 
"-1",

Review comment:
   Could you file a separate jira to follow up on PartitionEpoch post 2.8?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry

[GitHub] [kafka] tinawenqiao commented on pull request #9235: KAFKA-10449: Add some important parameter desc in connect-distributed.properties

2021-02-19 Thread GitBox


tinawenqiao commented on pull request #9235:
URL: https://github.com/apache/kafka/pull/9235#issuecomment-782437733


   > Thanks for the update, it looks much better.
   > 
   > I should have been clearer, I meant to update the documentation text of 
these settings, not just the comments in `WorkerConfigs`.
   > 
   > For example in 
https://github.com/apache/kafka/pull/9235/files#diff-5664ddd985065623bab1870e031ce4f3fcfee0bc4951e41b6b0e1584cd32fa2dR153,
 we could do something like:
   > 
   > ```java
   > private static final String REST_HOST_NAME_DOC
   > = "Hostname for the REST API. If this is set, it will only bind to 
this interface. Deprecated, only used when listeners is not set. Use listeners 
instead.";
   > ```
   
   Doc desc has been added.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch merged pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

2021-02-19 Thread GitBox


rhauch merged pull request #10152:
URL: https://github.com/apache/kafka/pull/10152


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579522827



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fin

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579522201



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fin

[jira] [Resolved] (KAFKA-10817) Add clusterId validation to Fetch handling

2021-02-19 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10817.
-
Resolution: Fixed

> Add clusterId validation to Fetch handling
> --
>
> Key: KAFKA-10817
> URL: https://issues.apache.org/jira/browse/KAFKA-10817
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: David Jacot
>Priority: Major
>
> Initially we were unsure how the clusterId would be generated by the cluster 
> after Zookeeper removal, so we did not implement it. It is looking now like 
> we will probably require users to generate it manually prior to starting the 
> cluster. See here for details: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-631%3A+The+Quorum-based+Kafka+Controller#KIP631:TheQuorumbasedKafkaController-Fencing.
>  In this case, we can assume that the clusterId will be provided when 
> instantiating the raft client, so we can add the logic to the request handler 
> to validate it in inbound Fetch requests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10129: KAFKA-10817; Add clusterId validation to Fetch handling

2021-02-19 Thread GitBox


hachikuji merged pull request #10129:
URL: https://github.com/apache/kafka/pull/10129


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10129: KAFKA-10817; Add clusterId validation to Fetch handling

2021-02-19 Thread GitBox


hachikuji commented on pull request #10129:
URL: https://github.com/apache/kafka/pull/10129#issuecomment-782423782


   I verified tests locally. I am going to merge to trunk and 2.8 since we seem 
blocked by jenkins at the moment.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-02-19 Thread GitBox


MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579519988



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##
@@ -76,6 +78,25 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
 assertEquals(3, records.count());
 }
 
+@Test
+public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+final MockConsumer emptyConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+final Map endOffsets = new HashMap<>();
+endOffsets.put(topicPartition, 0L);
+emptyConsumer.updateEndOffsets(endOffsets);
+
+final Map beginningOffsets = new HashMap<>();
+beginningOffsets.put(topicPartition, 0L);
+emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 
2L);
+
+final ConsumerRecords records = 
emptyConsumer.poll(Duration.ofMillis(500));
+assertEquals(0, records.count());

Review comment:
   @mjsax I guess got your point. In the CUT the call 
```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` 
makes the consumer client to seek that offset (if available) without a commit. 
The call to commit offset happens in another section of the code that is not 
under test there (line 407 of StreamsResetter.java).
   
   I will change the test logic so that the "given condition" uses 
```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` 
and "when condition" is actually the call of ```client.commitSync();```.
   
   This way, the "then condition" would be able to test using the commited 
offsets. Same applies for the other test.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##
@@ -76,6 +78,25 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
 assertEquals(3, records.count());
 }
 
+@Test
+public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+final MockConsumer emptyConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+final Map endOffsets = new HashMap<>();
+endOffsets.put(topicPartition, 0L);
+emptyConsumer.updateEndOffsets(endOffsets);
+
+final Map beginningOffsets = new HashMap<>();
+beginningOffsets.put(topicPartition, 0L);
+emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 
2L);
+
+final ConsumerRecords records = 
emptyConsumer.poll(Duration.ofMillis(500));
+assertEquals(0, records.count());

Review comment:
   @mjsax I guess got your point. In the CUT the call 
```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` 
makes the consumer client to seek that offset (if available) without a commit. 
The call to commit offset happens in another section of the code that is not 
under test there (line 407 of StreamsResetter.java).
   
   I will change the test logic so that the "given condition" uses 
```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` 
and "when condition" is actually the call of ```client.commitSync();```.
   
   This way, the "then condition" would be able to test using the committed 
offsets. Same applies for the other test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

2021-02-19 Thread GitBox


hachikuji commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r579516434



##
File path: settings.gradle
##
@@ -29,6 +29,7 @@ include 'clients',
 'log4j-appender',
 'metadata',
 'raft',
+'shell',

Review comment:
   Ok. I was mainly objecting to the generality of the name since this it 
is focused only on the metadata for KIP-500. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-02-19 Thread GitBox


MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579504541



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##
@@ -76,6 +78,25 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
 assertEquals(3, records.count());
 }
 
+@Test
+public void testResetToSpecificOffsetWhenPartitionIsEmpty() {

Review comment:
   You are right - indeed the test is orthogonal to the fix. I have decided 
to add it there due to the lack of any test scenario for reseting partition 
offset (to a specific offset) on an empty partition. The original CUT passed 
this test without the bug-fix. I considered an important scenario to test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-02-19 Thread GitBox


MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579502090



##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer client,
 final Map topicPartitionsAndOffset 
= client.offsetsForTimes(topicPartitionsAndTimes);
 
 for (final TopicPartition topicPartition : inputTopicPartitions) {
-client.seek(topicPartition, 
topicPartitionsAndOffset.get(topicPartition).offset());
+final Optional partitionOffset = 
Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+.map(OffsetAndTimestamp::offset)
+.filter(offset -> offset != 
ListOffsetsResponse.UNKNOWN_OFFSET);
+if (partitionOffset.isPresent()) {
+client.seek(topicPartition, partitionOffset.get());
+} else {
+client.seekToEnd(Collections.singletonList(topicPartition));
+System.out.println("Partition " + topicPartition.partition() + 
" from topic " + topicPartition.topic() +
+" is empty, without a committed record. Falling back 
to latest known offset.");

Review comment:
   I see your point, I don't mind removing "without a committed record" 
part of the message. @jeqo This would have to be updated on the scala code too, 
since I saw that the messages are about the same. Would that be ok?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579497748



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fina

[jira] [Comment Edited] (KAFKA-12328) Expose TaskId partition number

2021-02-19 Thread fml2 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287386#comment-17287386
 ] 

fml2 edited comment on KAFKA-12328 at 2/19/21, 9:53 PM:


{quote}In general, the ideas of the "supplier pattern" is _inherently_ to 
return a new object each time it's invoked
{quote}
According to my understanding, the supplier pattern gives you _the possibilty_ 
to return a new instance on each invocation – thus adjusting the result to the 
current situation if needed. But, as a pattern, it does not prohibit to also 
return the same instance if this flexibility is not needed.


was (Author: fml2):
{quote}In general, the ideas of the "supplier pattern" is _inherently_ to 
return a new object each time it's invoked
{quote}
According to my understanding, the supplier pattern gives you _the possibilty_ 
to return a new instance on each invoke – thus adjusting the result to the 
current situation if needed. But, as a pattern, it does not prohibit to also 
return the same instance if this flexibility is not needed.

> Expose TaskId partition number
> --
>
> Key: KAFKA-12328
> URL: https://issues.apache.org/jira/browse/KAFKA-12328
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: fml2
>Priority: Major
>  Labels: needs-kip
>
> This question was posted [on 
> stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
>  and got an answer but the solution is quite complicated hence this ticket.
>  
> In my Kafka Streams application, I have a task that sets up a scheduled (by 
> the wall time) punctuator. The punctuator iterates over the entries of a 
> store and does something with them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>var entry = iter.next();
>// ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The 
> java docs for {{ProcessorContext.partition()}} states that this method 
> returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs 
> Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
>  and the answers there. I can understand that a task is, in general, not tied 
> to a particular partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to 
> a partion wrong?
> What I need it for: I'd like to include the partition number in some log 
> messages. For now, I have several nearly identical log messages stating that 
> the punctuator does this and that. In order to make those messages "unique" 
> I'd like to include the partition number into them.
> Since I'm working with a single store here (which might be partitioned), I 
> assume that every single execution of the punctuator is bound to a single 
> partition of that store.
>  
> It would be cool if there were a method {{iterator.partition}} (or similar) 
> to get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12328) Expose TaskId partition number

2021-02-19 Thread fml2 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287386#comment-17287386
 ] 

fml2 commented on KAFKA-12328:
--

{quote}In general, the ideas of the "supplier pattern" is _inherently_ to 
return a new object each time it's invoked
{quote}
According to my understanding, the supplier pattern gives you _the possibilty_ 
to return a new instance on each invoke – thus adjusting the result to the 
current situation if needed. But, as a pattern, it does not prohibit to also 
return the same instance if this flexibility is not needed.

> Expose TaskId partition number
> --
>
> Key: KAFKA-12328
> URL: https://issues.apache.org/jira/browse/KAFKA-12328
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: fml2
>Priority: Major
>  Labels: needs-kip
>
> This question was posted [on 
> stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
>  and got an answer but the solution is quite complicated hence this ticket.
>  
> In my Kafka Streams application, I have a task that sets up a scheduled (by 
> the wall time) punctuator. The punctuator iterates over the entries of a 
> store and does something with them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>var entry = iter.next();
>// ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The 
> java docs for {{ProcessorContext.partition()}} states that this method 
> returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs 
> Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
>  and the answers there. I can understand that a task is, in general, not tied 
> to a particular partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to 
> a partion wrong?
> What I need it for: I'd like to include the partition number in some log 
> messages. For now, I have several nearly identical log messages stating that 
> the punctuator does this and that. In order to make those messages "unique" 
> I'd like to include the partition number into them.
> Since I'm working with a single store here (which might be partitioned), I 
> assume that every single execution of the punctuator is bound to a single 
> partition of that store.
>  
> It would be cool if there were a method {{iterator.partition}} (or similar) 
> to get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579496903



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fina

[jira] [Comment Edited] (KAFKA-12328) Expose TaskId partition number

2021-02-19 Thread fml2 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287372#comment-17287372
 ] 

fml2 edited comment on KAFKA-12328 at 2/19/21, 9:50 PM:


{quote}ie, the supplier must return a new object each time it's invoked
{quote}
Yes, that must be the error! I return the same instance in each supplier 
invokation. The docs does not express it clearly enough IMO that a NEW instance 
must be returned each time.

Could you please elaborate on this? Why is it required that the supplier must 
return a new instance on each invokation? If I return the same instance which 
is thread safe (it gets all the thread specific informations from the context) 
then a single instance would suffice.

The context should be a proxy object in this case. I.e. it would be a "static" 
(not chaning) value within the transformer/processor but would internally 
return the data specific to the particular task in each call. Similar to proxy 
objects in Spring.

But, apparently, this is not how kafka is implemented now so that I have to do 
this myself, right?


was (Author: fml2):
{quote}ie, the supplier must return a new object each time it's invoked
{quote}
Yes, that must be the error! I return the same instance in each supplier 
invokation. The docs does not express it clearly enough IMO that a NEW instance 
must be returned each time.

> Expose TaskId partition number
> --
>
> Key: KAFKA-12328
> URL: https://issues.apache.org/jira/browse/KAFKA-12328
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: fml2
>Priority: Major
>  Labels: needs-kip
>
> This question was posted [on 
> stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
>  and got an answer but the solution is quite complicated hence this ticket.
>  
> In my Kafka Streams application, I have a task that sets up a scheduled (by 
> the wall time) punctuator. The punctuator iterates over the entries of a 
> store and does something with them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>var entry = iter.next();
>// ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The 
> java docs for {{ProcessorContext.partition()}} states that this method 
> returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs 
> Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
>  and the answers there. I can understand that a task is, in general, not tied 
> to a particular partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to 
> a partion wrong?
> What I need it for: I'd like to include the partition number in some log 
> messages. For now, I have several nearly identical log messages stating that 
> the punctuator does this and that. In order to make those messages "unique" 
> I'd like to include the partition number into them.
> Since I'm working with a single store here (which might be partitioned), I 
> assume that every single execution of the punctuator is bound to a single 
> partition of that store.
>  
> It would be cool if there were a method {{iterator.partition}} (or similar) 
> to get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9270) KafkaStream crash on offset commit failure

2021-02-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9270.

Resolution: Fixed

> KafkaStream crash on offset commit failure
> --
>
> Key: KAFKA-9270
> URL: https://issues.apache.org/jira/browse/KAFKA-9270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Rohan Kulkarni
>Priority: Critical
>
> On our Production server we intermittently observe Kafka Streams get crashed 
> with TimeoutException while committing offset. The only workaround seems to 
> be restarting the application which is not a desirable solution for a 
> production environment.
>  
> While have already implemented ProductionExceptionHandler which does not 
> seems to address this.
>  
> Please provide a fix for this or a viable workaround.
>  
> +Application side logs:+
> 2019-11-17 08:28:48.055 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks 
> [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373]
>  - stream-thread 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to 
> commit stream task 0_1 due to the following error:*
>  *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}}
>  
> 2019-11-17 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 
> 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager 
> MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: 
> HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions 
> = [], controller = null) Active tasks: Running: Suspended: Restoring: New: 
> Standby tasks: Running: Suspended: Restoring: New:
>  org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}}
>  
> +Kafka broker logs:+
> [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f* 
> (org.apache.zookeeper.ClientCnxn)
>  [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  
> Regards,
> Rohan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9274) Gracefully handle timeout exceptions on Kafka Streams

2021-02-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9274.

Fix Version/s: 2.8.0
   Resolution: Fixed

> Gracefully handle timeout exceptions on Kafka Streams
> -
>
> Key: KAFKA-9274
> URL: https://issues.apache.org/jira/browse/KAFKA-9274
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: kip
> Fix For: 2.8.0
>
>
> Right now streams don't treat timeout exception as retriable in general by 
> throwing it to the application level. If not handled by the user, this would 
> kill the stream thread unfortunately.
> In fact, timeouts happen mostly due to network issue or server side 
> unavailability. Hard failure on client seems to be an over-kill.
> We would like to discuss what's the best practice to handle timeout 
> exceptions on Streams. The current state is still brainstorming and 
> consolidate all the cases that contain timeout exception within this ticket.
> This ticket is backed by KIP-572: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax merged pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

2021-02-19 Thread GitBox


mjsax merged pull request #10072:
URL: https://github.com/apache/kafka/pull/10072


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12328) Expose TaskId partition number

2021-02-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287375#comment-17287375
 ] 

Matthias J. Sax commented on KAFKA-12328:
-

We actually just updated the JavaDocs for the upcoming 2.8.0 release: 
https://issues.apache.org/jira/browse/KAFKA-10036 

In general, the ideas of the "supplier pattern" is _inherently_ to return a new 
object each time it's invoked. Otherwise, the API could be simpler, ie, you 
could just pass a `Processor` object directly (instead of a supplier). But it 
seem that many people don't know about the "supplier pattern"...

> Expose TaskId partition number
> --
>
> Key: KAFKA-12328
> URL: https://issues.apache.org/jira/browse/KAFKA-12328
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: fml2
>Priority: Major
>  Labels: needs-kip
>
> This question was posted [on 
> stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
>  and got an answer but the solution is quite complicated hence this ticket.
>  
> In my Kafka Streams application, I have a task that sets up a scheduled (by 
> the wall time) punctuator. The punctuator iterates over the entries of a 
> store and does something with them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>var entry = iter.next();
>// ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The 
> java docs for {{ProcessorContext.partition()}} states that this method 
> returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs 
> Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
>  and the answers there. I can understand that a task is, in general, not tied 
> to a particular partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to 
> a partion wrong?
> What I need it for: I'd like to include the partition number in some log 
> messages. For now, I have several nearly identical log messages stating that 
> the punctuator does this and that. In order to make those messages "unique" 
> I'd like to include the partition number into them.
> Since I'm working with a single store here (which might be partitioned), I 
> assume that every single execution of the punctuator is bound to a single 
> partition of that store.
>  
> It would be cool if there were a method {{iterator.partition}} (or similar) 
> to get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on a change in pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

2021-02-19 Thread GitBox


kkonstantine commented on a change in pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#discussion_r579485651



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
##
@@ -82,6 +82,7 @@ public long deadline() {
 
 public static void handleMetadataErrors(MetadataResponse response) {
 for (TopicMetadata tm : response.topicMetadata()) {
+if (shouldRefreshMetadata(tm.error())) throw 
tm.error().exception();

Review comment:
   nit: a stylistic observation is that this call is exactly the same as 
the call below. Yet it's written differently. 
   
   Styles differ, usually not too much, from module to module. I think it's 
good to keep existing styles to help with readability when the changes don't 
require a greater change. 

   ```suggestion
   if (shouldRefreshMetadata(tm.error())) {
   throw tm.error().exception();
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

2021-02-19 Thread GitBox


kkonstantine commented on a change in pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#discussion_r579485651



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
##
@@ -82,6 +82,7 @@ public long deadline() {
 
 public static void handleMetadataErrors(MetadataResponse response) {
 for (TopicMetadata tm : response.topicMetadata()) {
+if (shouldRefreshMetadata(tm.error())) throw 
tm.error().exception();

Review comment:
   nit: a stylistic observation is that this call is exactly the same as 
the call below. Yet it's written differently. 
   
   Styles differ, usually not too much, from module to module. I think it's 
good to keep existing styles to help with readability and when the changes 
don't require a greater change. 

   ```suggestion
   if (shouldRefreshMetadata(tm.error())) {
   throw tm.error().exception();
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

2021-02-19 Thread GitBox


kkonstantine commented on a change in pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#discussion_r579485651



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
##
@@ -82,6 +82,7 @@ public long deadline() {
 
 public static void handleMetadataErrors(MetadataResponse response) {
 for (TopicMetadata tm : response.topicMetadata()) {
+if (shouldRefreshMetadata(tm.error())) throw 
tm.error().exception();

Review comment:
   nit: a stylistic observation is that this call is exactly the same as 
the call below. Yet it's written differently. 
   
   Styles differ, usually not too much, from module to module. I think it's 
good to keep existing styles to help with readability and when the changes 
don't require a greater change. 

   ```suggestion
   if (shouldRefreshMetadata(tm.error())) { 
   throw tm.error().exception();
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12328) Expose TaskId partition number

2021-02-19 Thread fml2 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287372#comment-17287372
 ] 

fml2 commented on KAFKA-12328:
--

{quote}ie, the supplier must return a new object each time it's invoked
{quote}
Yes, that must be the error! I return the same instance in each supplier 
invokation. The docs does not express it clearly enough IMO that a NEW instance 
must be returned each time.

> Expose TaskId partition number
> --
>
> Key: KAFKA-12328
> URL: https://issues.apache.org/jira/browse/KAFKA-12328
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: fml2
>Priority: Major
>  Labels: needs-kip
>
> This question was posted [on 
> stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
>  and got an answer but the solution is quite complicated hence this ticket.
>  
> In my Kafka Streams application, I have a task that sets up a scheduled (by 
> the wall time) punctuator. The punctuator iterates over the entries of a 
> store and does something with them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>var entry = iter.next();
>// ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The 
> java docs for {{ProcessorContext.partition()}} states that this method 
> returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs 
> Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
>  and the answers there. I can understand that a task is, in general, not tied 
> to a particular partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to 
> a partion wrong?
> What I need it for: I'd like to include the partition number in some log 
> messages. For now, I have several nearly identical log messages stating that 
> the punctuator does this and that. In order to make those messages "unique" 
> I'd like to include the partition number into them.
> Since I'm working with a single store here (which might be partitioned), I 
> assume that every single execution of the punctuator is bound to a single 
> partition of that store.
>  
> It would be cool if there were a method {{iterator.partition}} (or similar) 
> to get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12169) Consumer can not know paritions change when client leader restart with static membership protocol

2021-02-19 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287357#comment-17287357
 ] 

Boyang Chen commented on KAFKA-12169:
-

[~guozhang] One question regarding T1, when we receive the assignment, do we 
validate its total partitions against our own metadata? If so, the leader 
should attempt a rejoin IMHO.

> Consumer can not know paritions change when client leader restart with static 
> membership protocol
> -
>
> Key: KAFKA-12169
> URL: https://issues.apache.org/jira/browse/KAFKA-12169
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.5.1, 2.6.1
>Reporter: zou shengfu
>Priority: Major
>  Labels: bug
>
> Background: 
>  Kafka consumer services run with static membership and cooperative rebalance 
> protocol on kubernetes, and services often restart because of operation. When 
> we added partitions from 1000 to 2000 for the topic, client leader restart 
> with unknown member id at the same time, we found  the consumers do not 
> tigger rebalance and still consume 1000 paritions
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579464755



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -397,6 +397,9 @@ class BrokerServer(
   info("shutting down")
 
   if (config.controlledShutdownEnable) {
+// Shut down the broker metadata listener, so that we don't get added 
to any
+// more ISRs.
+brokerMetadataListener.beginShutdown()

Review comment:
   It does need to be because there are some paths through the code that 
don't go through here.  In general calling `beginShutdown` or `close` multiple 
times is harmless-- only the first time has an effect.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch merged pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException

2021-02-19 Thread GitBox


rhauch merged pull request #10158:
URL: https://github.com/apache/kafka/pull/10158


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579463797



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fin

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579463797



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fin

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579462462



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fin

[GitHub] [kafka] hachikuji commented on pull request #10157: MINOR: Raft request thread should discover api versions

2021-02-19 Thread GitBox


hachikuji commented on pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#issuecomment-782330736


   We are missing the handling of authentication and unsupported version 
exceptions in BrokerToControllerChannelManager. I am working on the test cases 
now and will submit an update shortly.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579458466



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fin

[GitHub] [kafka] ableegoldman commented on pull request #10151: MINOR: Correct warning

2021-02-19 Thread GitBox


ableegoldman commented on pull request #10151:
URL: https://github.com/apache/kafka/pull/10151#issuecomment-782323389


   Merged to trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10151: MINOR: Correct warning

2021-02-19 Thread GitBox


ableegoldman merged pull request #10151:
URL: https://github.com/apache/kafka/pull/10151


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12169) Consumer can not know paritions change when client leader restart with static membership protocol

2021-02-19 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12169:
--
Labels: bug  (was: )

> Consumer can not know paritions change when client leader restart with static 
> membership protocol
> -
>
> Key: KAFKA-12169
> URL: https://issues.apache.org/jira/browse/KAFKA-12169
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.5.1, 2.6.1
>Reporter: zou shengfu
>Priority: Major
>  Labels: bug
>
> Background: 
>  Kafka consumer services run with static membership and cooperative rebalance 
> protocol on kubernetes, and services often restart because of operation. When 
> we added partitions from 1000 to 2000 for the topic, client leader restart 
> with unknown member id at the same time, we found  the consumers do not 
> tigger rebalance and still consume 1000 paritions
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10151: MINOR: Correct warning

2021-02-19 Thread GitBox


ableegoldman commented on pull request #10151:
URL: https://github.com/apache/kafka/pull/10151#issuecomment-782322194


   Some unrelated test failures:
   ```
   Build / JDK 15 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining
   Build / JDK 8 / 
kafka.admin.TopicCommandWithAdminClientTest.testDescribeUnderReplicatedPartitions()
   Build / JDK 8 / 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10149: AbstractCoordinator should log with its subclass

2021-02-19 Thread GitBox


chia7712 commented on pull request #10149:
URL: https://github.com/apache/kafka/pull/10149#issuecomment-782314292


   > Or are you suggesting we apply this change more broadly?
   
   Yep, I feel this change can be applied more broadly :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-02-19 Thread GitBox


mjsax commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579446056



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##
@@ -247,6 +268,27 @@ public void 
shouldDetermineInternalTopicBasedOnTopicName1() {
 
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
 }
 
+@Test
+public void emptyPartitionsAreCorrectlyHandledWhenResettingByDateAndTime() 
{
+final MockConsumer emptyConsumer = new 
EmptyPartitionConsumer<>(OffsetResetStrategy.EARLIEST);
+emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+final Map endOffsets = new HashMap<>();
+endOffsets.put(topicPartition, 0L);
+emptyConsumer.updateEndOffsets(endOffsets);
+
+final Map beginningOffsets = new HashMap<>();
+beginningOffsets.put(topicPartition, 0L);
+emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+final long yesterdayTimestamp = 
Instant.now().minus(Duration.ofDays(1)).toEpochMilli();
+
+streamsResetter.resetToDatetime(emptyConsumer, inputTopicPartitions, 
yesterdayTimestamp);
+
+final ConsumerRecords records = 
emptyConsumer.poll(Duration.ofMillis(500));
+assertEquals(0, records.count());

Review comment:
   Some comment as above.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##
@@ -76,6 +78,25 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
 assertEquals(3, records.count());
 }
 
+@Test
+public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+final MockConsumer emptyConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+final Map endOffsets = new HashMap<>();
+endOffsets.put(topicPartition, 0L);
+emptyConsumer.updateEndOffsets(endOffsets);
+
+final Map beginningOffsets = new HashMap<>();
+beginningOffsets.put(topicPartition, 0L);
+emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 
2L);
+
+final ConsumerRecords records = 
emptyConsumer.poll(Duration.ofMillis(500));
+assertEquals(0, records.count());

Review comment:
   Not sure if I understand this test. As we use a `MockConsumer` and we 
never call `addRecord()` this condition should be `true` independent of 
`StreamsResetter`.
   
   Should we not rather verify if `streamsResetter` did _commit_ offsets as 
expected?

##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer client,
 final Map topicPartitionsAndOffset 
= client.offsetsForTimes(topicPartitionsAndTimes);
 
 for (final TopicPartition topicPartition : inputTopicPartitions) {
-client.seek(topicPartition, 
topicPartitionsAndOffset.get(topicPartition).offset());
+final Optional partitionOffset = 
Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+.map(OffsetAndTimestamp::offset)
+.filter(offset -> offset != 
ListOffsetsResponse.UNKNOWN_OFFSET);
+if (partitionOffset.isPresent()) {
+client.seek(topicPartition, partitionOffset.get());
+} else {
+client.seekToEnd(Collections.singletonList(topicPartition));
+System.out.println("Partition " + topicPartition.partition() + 
" from topic " + topicPartition.topic() +
+" is empty, without a committed record. Falling back 
to latest known offset.");

Review comment:
   > without a committed record
   
   Not sure what this means? Does `is empty` need any clarification?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##
@@ -76,6 +78,25 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
 assertEquals(3, records.count());
 }
 
+@Test
+public void testResetToSpecificOffsetWhenPartitionIsEmpty() {

Review comment:
   This test seems to be orthogonal to the fix. Was just wondering why we 
add it and what it's purpose is? It's always good to close testing gaps, just 
not sure if I understand what this test really tests?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

2021-02-19 Thread GitBox


ableegoldman commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r579430512



##
File path: docs/streams/upgrade-guide.html
##
@@ -127,6 +127,15 @@ Streams API
 into the constructor, it is no longer required to set mandatory 
configuration parameters
 (cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument";>KIP-680).
 
+
+Kafka Streams is now handling TimeoutException thrown by 
the consumer, producer, and admin client.
+If a timeout occurs on a task, Kafka Streams moves to the next task 
and retries to make progress on the failed
+task in the next iteration.
+To bound how long Kafka Streams retries a task, you can set 
task.timeout.ms (default is 5 minutes).
+If a task does not make progress within the specified task timeout 
(the timeout it tracked on a per-task basis)

Review comment:
   nit: the timeout _is_ tracked on a per-task basis

##
File path: docs/streams/upgrade-guide.html
##
@@ -127,6 +127,15 @@ Streams API
 into the constructor, it is no longer required to set mandatory 
configuration parameters
 (cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument";>KIP-680).
 
+
+Kafka Streams is now handling TimeoutException thrown by 
the consumer, producer, and admin client.
+If a timeout occurs on a task, Kafka Streams moves to the next task 
and retries to make progress on the failed
+task in the next iteration.
+To bound how long Kafka Streams retries a task, you can set 
task.timeout.ms (default is 5 minutes).
+If a task does not make progress within the specified task timeout 
(the timeout it tracked on a per-task basis)

Review comment:
   or better yet
   ```suggestion
   If a task does not make progress within the specified task timeout, 
which is tracked on a per-task basis,
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579438984



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fina

[GitHub] [kafka] gardnervickers closed pull request #6742: Improve performance of checkpointHighWatermarks, patch 2/2

2021-02-19 Thread GitBox


gardnervickers closed pull request #6742:
URL: https://github.com/apache/kafka/pull/6742


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12347) Improve Kafka Streams ability to track progress

2021-02-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12347:

Description: 
Add metrics to track records being consumed fully and to tell if tasks are 
idling. This will allow users of streams to build uptime metrics around streams 
with less difficulty.

KIP-715: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-715%3A+Expose+Committed+offset+in+streams
 

  was:Add metrics to track records being consumed fully and to tell if tasks 
are idling. This will allow users of streams to build uptime metrics around 
streams with less difficulty.


> Improve Kafka Streams ability to track progress
> ---
>
> Key: KAFKA-12347
> URL: https://issues.apache.org/jira/browse/KAFKA-12347
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>  Labels: kip
> Fix For: 3.0.0
>
>
> Add metrics to track records being consumed fully and to tell if tasks are 
> idling. This will allow users of streams to build uptime metrics around 
> streams with less difficulty.
> KIP-715: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-715%3A+Expose+Committed+offset+in+streams
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12347) Improve Kafka Streams ability to track progress

2021-02-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12347:

Labels: kip  (was: need-kip)

> Improve Kafka Streams ability to track progress
> ---
>
> Key: KAFKA-12347
> URL: https://issues.apache.org/jira/browse/KAFKA-12347
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>  Labels: kip
> Fix For: 3.0.0
>
>
> Add metrics to track records being consumed fully and to tell if tasks are 
> idling. This will allow users of streams to build uptime metrics around 
> streams with less difficulty.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12328) Expose TaskId partition number

2021-02-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287324#comment-17287324
 ] 

Matthias J. Sax edited comment on KAFKA-12328 at 2/19/21, 7:44 PM:
---

If this happens in "steady state" it would be very suspicious. You monitor / 
verify the task assignment from the logs, that should give you more intel. 

The other thing coming to my mind is, that it could be programming error: You 
cannot share a single `Processor` instance (ie, the supplier must return a new 
object each time it's invoked). You should also not have and shared state (like 
`static` variable). If you share the same processor instance for two tasks, the 
tasks are not isolated any longer (as they should be) and it could lead to what 
you observe.


was (Author: mjsax):
If this happens in "steady state" it would be very suspicious. You monitor / 
verify the task assignment from the logs, that should give you more intel. 

> Expose TaskId partition number
> --
>
> Key: KAFKA-12328
> URL: https://issues.apache.org/jira/browse/KAFKA-12328
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: fml2
>Priority: Major
>  Labels: needs-kip
>
> This question was posted [on 
> stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
>  and got an answer but the solution is quite complicated hence this ticket.
>  
> In my Kafka Streams application, I have a task that sets up a scheduled (by 
> the wall time) punctuator. The punctuator iterates over the entries of a 
> store and does something with them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>var entry = iter.next();
>// ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The 
> java docs for {{ProcessorContext.partition()}} states that this method 
> returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs 
> Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
>  and the answers there. I can understand that a task is, in general, not tied 
> to a particular partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to 
> a partion wrong?
> What I need it for: I'd like to include the partition number in some log 
> messages. For now, I have several nearly identical log messages stating that 
> the punctuator does this and that. In order to make those messages "unique" 
> I'd like to include the partition number into them.
> Since I'm working with a single store here (which might be partitioned), I 
> assume that every single execution of the punctuator is bound to a single 
> partition of that store.
>  
> It would be cool if there were a method {{iterator.partition}} (or similar) 
> to get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r579433300



##
File path: settings.gradle
##
@@ -29,6 +29,7 @@ include 'clients',
 'log4j-appender',
 'metadata',
 'raft',
+'shell',

Review comment:
   It would be nice to have it as just "shell" until we get another 
shell





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12328) Expose TaskId partition number

2021-02-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287324#comment-17287324
 ] 

Matthias J. Sax commented on KAFKA-12328:
-

If this happens in "steady state" it would be very suspicious. You monitor / 
verify the task assignment from the logs, that should give you more intel. 

> Expose TaskId partition number
> --
>
> Key: KAFKA-12328
> URL: https://issues.apache.org/jira/browse/KAFKA-12328
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: fml2
>Priority: Major
>  Labels: needs-kip
>
> This question was posted [on 
> stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
>  and got an answer but the solution is quite complicated hence this ticket.
>  
> In my Kafka Streams application, I have a task that sets up a scheduled (by 
> the wall time) punctuator. The punctuator iterates over the entries of a 
> store and does something with them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>var entry = iter.next();
>// ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The 
> java docs for {{ProcessorContext.partition()}} states that this method 
> returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs 
> Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
>  and the answers there. I can understand that a task is, in general, not tied 
> to a particular partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to 
> a partion wrong?
> What I need it for: I'd like to include the partition number in some log 
> messages. For now, I have several nearly identical log messages stating that 
> the punctuator does this and that. In order to make those messages "unique" 
> I'd like to include the partition number into them.
> Since I'm working with a single store here (which might be partitioned), I 
> assume that every single execution of the punctuator is bound to a single 
> partition of that store.
>  
> It would be cool if there were a method {{iterator.partition}} (or similar) 
> to get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r579432108



##
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of 
memory.
+ */
+public final class LocalLogManager implements MetaLogManager, AutoCloseable {

Review comment:
   Yes, this should not be present twice.  Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r579431376



##
File path: core/src/main/scala/kafka/server/Server.scala
##
@@ -29,6 +29,7 @@ trait Server {
 }
 
 object Server {
+  val metadataTopicName = "@metadata"

Review comment:
   yes, let's do that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions

2021-02-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287316#comment-17287316
 ] 

Matthias J. Sax edited comment on KAFKA-7499 at 2/19/21, 7:34 PM:
--

[~aniket0710] -- I think [~jbfletch] never got to open a PR for it. Maybe 
[~high.lee] wants to take it over (asked about it above).


was (Author: mjsax):
[~aniket0710] -- I think [~jbfletch] never got to open a PR for it. Maybe 
[~high.lee] wants to take it over (he asked about it above).

> Extend ProductionExceptionHandler to cover serialization exceptions
> ---
>
> Key: KAFKA-7499
> URL: https://issues.apache.org/jira/browse/KAFKA-7499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: beginner, kip, newbie
>
> In 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce],
>  an exception handler for the write path was introduced. This exception 
> handler covers exception that are raised in the producer callback.
> However, serialization happens before the data is handed to the producer with 
> Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair 
> types.
> Thus, we might want to extend the ProductionExceptionHandler to cover 
> serialization exception, too, to skip over corrupted output messages. An 
> example could be a "String" message that contains invalid JSON and should be 
> serialized as JSON.
> KIP-399 (not voted yet; feel free to pick it up): 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions

2021-02-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287316#comment-17287316
 ] 

Matthias J. Sax commented on KAFKA-7499:


[~aniket0710] -- I think [~jbfletch] never got to open a PR for it. Maybe 
[~high.lee] wants to take it over (he asked about it above).

> Extend ProductionExceptionHandler to cover serialization exceptions
> ---
>
> Key: KAFKA-7499
> URL: https://issues.apache.org/jira/browse/KAFKA-7499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: beginner, kip, newbie
>
> In 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce],
>  an exception handler for the write path was introduced. This exception 
> handler covers exception that are raised in the producer callback.
> However, serialization happens before the data is handed to the producer with 
> Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair 
> types.
> Thus, we might want to extend the ProductionExceptionHandler to cover 
> serialization exception, too, to skip over corrupted output messages. An 
> example could be a "String" message that contains invalid JSON and should be 
> serialized as JSON.
> KIP-399 (not voted yet; feel free to pick it up): 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions

2021-02-19 Thread jbfletch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jbfletch reassigned KAFKA-7499:
---

Assignee: (was: jbfletch)

> Extend ProductionExceptionHandler to cover serialization exceptions
> ---
>
> Key: KAFKA-7499
> URL: https://issues.apache.org/jira/browse/KAFKA-7499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: beginner, kip, newbie
>
> In 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce],
>  an exception handler for the write path was introduced. This exception 
> handler covers exception that are raised in the producer callback.
> However, serialization happens before the data is handed to the producer with 
> Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair 
> types.
> Thus, we might want to extend the ProductionExceptionHandler to cover 
> serialization exception, too, to skip over corrupted output messages. An 
> example could be a "String" message that contains invalid JSON and should be 
> serialized as JSON.
> KIP-399 (not voted yet; feel free to pick it up): 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579411851



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private fina

[GitHub] [kafka] bbejeck commented on pull request #10150: KAFKA-3745: Add access to read-only key in value joiner

2021-02-19 Thread GitBox


bbejeck commented on pull request #10150:
URL: https://github.com/apache/kafka/pull/10150#issuecomment-782288891


   Java 11 passed
   Java 8 and Java 15 failed with 
   ```
   JDK 15 
 
org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers
   JDK 8 
 
org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers
 kafka.api.ConsumerBounceTest.testClose()
   
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable
   
   ```
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12332) Error partitions from topics with invalid IDs in LISR requests

2021-02-19 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12332.
-
Resolution: Fixed

> Error partitions from topics with invalid IDs in LISR requests
> --
>
> Key: KAFKA-12332
> URL: https://issues.apache.org/jira/browse/KAFKA-12332
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> In a situation where topics are deleted and recreated in a short amount of 
> time, LeaderAndIsr requests can contain topics with invalid IDs, but correct 
> epochs. In this case, we will incorrectly handle the request and simply log 
> an error message on the broker. It will be more useful to not handle the 
> request for the partition with an invalid ID and send and error back to the 
> controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

2021-02-19 Thread GitBox


hachikuji merged pull request #10143:
URL: https://github.com/apache/kafka/pull/10143


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe closed pull request #10155: Fix Raft broker restart issue when offset partitions are deferred

2021-02-19 Thread GitBox


cmccabe closed pull request #10155:
URL: https://github.com/apache/kafka/pull/10155


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10155: Fix Raft broker restart issue when offset partitions are deferred

2021-02-19 Thread GitBox


cmccabe commented on pull request #10155:
URL: https://github.com/apache/kafka/pull/10155#issuecomment-782280589


   committed to trunk and 2.8 after running tests locally



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10105: Kip500 full

2021-02-19 Thread GitBox


rondagostino commented on a change in pull request #10105:
URL: https://github.com/apache/kafka/pull/10105#discussion_r579407572



##
File path: build.gradle
##
@@ -1029,6 +1029,7 @@ project(':metadata') {
 compile project(':clients')
 compile libs.jacksonDatabind
 compile libs.jacksonJDK8Datatypes
+compile libs.metrics

Review comment:
   > file a JIRA to discuss whether the metadata module should use Kafka 
metrics or Yammer metrics.
   
   https://issues.apache.org/jira/browse/KAFKA-12348
   
   cc: @cmccabe 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12348) The metadata module currently uses Yammer metrics. Should it uses Kafka metrics instead?

2021-02-19 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-12348:
-

 Summary: The metadata module currently uses Yammer metrics.  
Should it uses Kafka metrics instead?
 Key: KAFKA-12348
 URL: https://issues.apache.org/jira/browse/KAFKA-12348
 Project: Kafka
  Issue Type: Task
  Components: metrics
Affects Versions: 3.0.0, 2.8.0
Reporter: Ron Dagostino






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException

2021-02-19 Thread GitBox


kkonstantine commented on a change in pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#discussion_r579403166



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -366,6 +346,42 @@ private void readToLogEnd() {
 }
 }
 
+// Visible for testing
+Map readEndOffsets(Set assignment) {
+log.trace("Reading to end of offset log");
+
+Map endOffsets;
+// Note that we'd prefer to not use the consumer to find the end 
offsets for the assigned topic partitions.
+// That is because it's possible that the consumer is already blocked 
waiting for new records to appear, when
+// the consumer is already at the end. In such cases, using 
'consumer.endOffsets(...)' will block until at least
+// one more record becomes available, meaning we can't even check 
whether we're at the end offset.
+// Since all we're trying to do here is get the end offset, we should 
use the supplied admin client
+// (if available)
+// (which prevents 'consumer.endOffsets(...)'
+// from
+
+// Deprecated constructors do not provide an admin supplier, so the 
admin is potentially null.
+if (useAdminForListOffsets) {
+// Use the admin client to immediately find the end offsets for 
the assigned topic partitions.
+// Unlike using the consumer
+try {
+return admin.endOffsets(assignment);
+} catch (UnsupportedVersionException e) {
+// This may happen with really old brokers that don't support 
the auto topic creation
+// field in metadata requests
+log.debug("Reading to end of log offsets with consumer since 
admin client is unsupported: {}", e.getMessage());
+useAdminForListOffsets = false;

Review comment:
   Good point. As long as `SharedTopicAdmin` admin won't be recycled. 
That's negligible anyways, I was referring to the pattern mainly. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException

2021-02-19 Thread GitBox


kkonstantine commented on a change in pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#discussion_r579395276



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -161,6 +164,7 @@ public void start() {
 
 // Create the topic admin client and initialize the topic ...
 admin = topicAdminSupplier.get();   // may be null
+useAdminForListOffsets = admin != null;

Review comment:
   My main observation here is that keeping initialization in the 
constructor allows us to declare read only fields as `final`. 
   It also offers better exception safety, in the sense that when an object is 
constructed (the call to its constructor succeeds) then we know that more or 
less we have a functional instance of that class. 
   
   I'm not ignoring the cases where the initialization of variables has to 
happen at a later time during the call of `start`. But hopefully these cases 
are minimal, or else `start` ends up being the actual constructor of objects 
like this one here. Which can be redundant and in some cases more risky. 
   
   As I mentioned, since `admin` is not immutable here (can be set to `null` 
later to support connection with older brokers) I'm fine keeping this call in 
`start`. But I'd still recommend not pushing initializations outside the 
constructor if they can happen at the time of the object construction (as is 
the case with `admin`'s initial initialization here). 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-19 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r579378306



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,875 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private final int partitionEpoch;
+
+ 

  1   2   >