[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-10 Thread GitBox


rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r573916465



##
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.controller.StateChangeLogger
+import kafka.log.{Log, LogManager}
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.LazyOffsetCheckpoints
+import kafka.server.metadata.{ConfigRepository, MetadataImage, 
MetadataImageBuilder, MetadataPartition, RaftMetadataCache}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+ metrics: Metrics,
+ time: Time,
+ scheduler: Scheduler,
+ logManager: LogManager,
+ isShuttingDown: AtomicBoolean,
+ quotaManagers: QuotaManagers,
+ brokerTopicStats: BrokerTopicStats,
+ metadataCache: RaftMetadataCache,
+ logDirFailureChannel: LogDirFailureChannel,
+ delayedProducePurgatory: 
DelayedOperationPurgatory[DelayedProduce],
+ delayedFetchPurgatory: 
DelayedOperationPurgatory[DelayedFetch],
+ delayedDeleteRecordsPurgatory: 
DelayedOperationPurgatory[DelayedDeleteRecords],
+ delayedElectLeaderPurgatory: 
DelayedOperationPurgatory[DelayedElectLeader],
+ threadNamePrefix: Option[String],
+ configRepository: ConfigRepository,
+ alterIsrManager: AlterIsrManager) extends 
ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, 
quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, 
delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, 
threadNamePrefix, configRepository, alterIsrManager) {
+
+  def this(config: KafkaConfig,
+   metrics: Metrics,
+   time: Time,
+   scheduler: Scheduler,
+   logManager: LogManager,
+   isShuttingDown: AtomicBoolean,
+   quotaManagers: QuotaManagers,
+   brokerTopicStats: BrokerTopicStats,
+   metadataCache: RaftMetadataCache,
+   logDirFailureChannel: LogDirFailureChannel,
+   alterIsrManager: AlterIsrManager,
+   configRepository: ConfigRepository,
+   threadNamePrefix: Option[String] = None) = {
+this(config, metrics, time, scheduler, logManager, isShuttingDown,
+  quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
+  DelayedOperationPurgatory[DelayedProduce](
+purgatoryName = "Produce", brokerId = config.brokerId,
+purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
+  DelayedOperationPurgatory[DelayedFetch](
+purgatoryName = "Fetch", brokerId = config.brokerId,
+purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
+  DelayedOperationPurgatory[DelayedDeleteRecords](
+purgatoryName = "DeleteRecords", brokerId = config.brokerId,
+purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
+  DelayedOperationPurgatory[DelayedElectLeader](
+purgatoryName = "ElectLeader", brokerId = config.brokerId),
+  threadNamePrefix, configRepository, alterIsrManager)
+  }
+
+  if (config.requiresZookeeper) {
+throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} 
when using ZooKeeper")
+  }
+
+  class RaftReplicaManagerChangeDelegateHelper(raftReplicaManager: 
RaftReplicaManager) extends RaftReplicaChangeDelegateHelper {
+override def 

[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-10 Thread GitBox


rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r573855674



##
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.controller.StateChangeLogger
+import kafka.log.{Log, LogManager}
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.LazyOffsetCheckpoints
+import kafka.server.metadata.{ConfigRepository, MetadataImage, 
MetadataImageBuilder, MetadataPartition, RaftMetadataCache}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+ metrics: Metrics,
+ time: Time,
+ scheduler: Scheduler,
+ logManager: LogManager,
+ isShuttingDown: AtomicBoolean,
+ quotaManagers: QuotaManagers,
+ brokerTopicStats: BrokerTopicStats,
+ metadataCache: RaftMetadataCache,
+ logDirFailureChannel: LogDirFailureChannel,
+ delayedProducePurgatory: 
DelayedOperationPurgatory[DelayedProduce],
+ delayedFetchPurgatory: 
DelayedOperationPurgatory[DelayedFetch],
+ delayedDeleteRecordsPurgatory: 
DelayedOperationPurgatory[DelayedDeleteRecords],
+ delayedElectLeaderPurgatory: 
DelayedOperationPurgatory[DelayedElectLeader],
+ threadNamePrefix: Option[String],
+ configRepository: ConfigRepository,
+ alterIsrManager: AlterIsrManager) extends 
ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, 
quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, 
delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, 
threadNamePrefix, configRepository, alterIsrManager) {
+
+  def this(config: KafkaConfig,
+   metrics: Metrics,
+   time: Time,
+   scheduler: Scheduler,
+   logManager: LogManager,
+   isShuttingDown: AtomicBoolean,
+   quotaManagers: QuotaManagers,
+   brokerTopicStats: BrokerTopicStats,
+   metadataCache: RaftMetadataCache,
+   logDirFailureChannel: LogDirFailureChannel,
+   alterIsrManager: AlterIsrManager,
+   configRepository: ConfigRepository,
+   threadNamePrefix: Option[String] = None) = {
+this(config, metrics, time, scheduler, logManager, isShuttingDown,
+  quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
+  DelayedOperationPurgatory[DelayedProduce](
+purgatoryName = "Produce", brokerId = config.brokerId,
+purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
+  DelayedOperationPurgatory[DelayedFetch](
+purgatoryName = "Fetch", brokerId = config.brokerId,
+purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
+  DelayedOperationPurgatory[DelayedDeleteRecords](
+purgatoryName = "DeleteRecords", brokerId = config.brokerId,
+purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
+  DelayedOperationPurgatory[DelayedElectLeader](
+purgatoryName = "ElectLeader", brokerId = config.brokerId),
+  threadNamePrefix, configRepository, alterIsrManager)
+  }
+
+  if (config.requiresZookeeper) {
+throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} 
when using ZooKeeper")
+  }
+
+  class RaftReplicaManagerChangeDelegateHelper(raftReplicaManager: 
RaftReplicaManager) extends RaftReplicaChangeDelegateHelper {
+override def 

[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-09 Thread GitBox


rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r573201310



##
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##
@@ -39,11 +41,30 @@ object MetadataPartition {
   record.isr(),
   Collections.emptyList(), // TODO KAFKA-12285 handle offline replicas
   Collections.emptyList(),
-  Collections.emptyList())
+  Collections.emptyList(),
+  largestDeferredOffsetEverSeen = 
deferredAtOffset.getOrElse(OffsetNeverDeferred),
+  isCurrentlyDeferringChanges = deferredAtOffset.isDefined)

Review comment:
   As discussed offline, we will eliminate the information from the 
messages we log when applying deferred changes, and we won't carry that info 
around in `MetadataPartition`.  Currently `RaftReplicaManager` knows if it is 
deferring changes or not.  Maybe later when we get `BrokerLifecycleManager` and 
`BrokerMetadataListener` committed we can think about where a global boolean 
might live to identify if the broker is fenced or not.  It isn't critical to 
decide right now because we are only going to defer the application of 
partition metadata at startup in 2.8.

##
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##
@@ -30,7 +31,8 @@ import scala.jdk.CollectionConverters._
 
 
 object MetadataPartition {
-  def apply(name: String, record: PartitionRecord): MetadataPartition = {
+  val OffsetNeverDeferred = 0L // must not be a valid offset we could see 
(i.e. must not be positive)

Review comment:
   As discussed offline, we will remove this and not include the last seen 
offset in log messages when applying deferred changes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-08 Thread GitBox


rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r572442503



##
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##
@@ -39,11 +41,30 @@ object MetadataPartition {
   record.isr(),
   Collections.emptyList(), // TODO KAFKA-12285 handle offline replicas
   Collections.emptyList(),
-  Collections.emptyList())
+  Collections.emptyList(),
+  largestDeferredOffsetEverSeen = 
deferredAtOffset.getOrElse(OffsetNeverDeferred),
+  isCurrentlyDeferringChanges = deferredAtOffset.isDefined)

Review comment:
   We basically use `largestDeferredOffsetEverSeen` only for logging at 
this point -- we also check it in a few `private def sanityCheckState...()` 
`RaftReplicaManager` methods.  We could completely eliminate 
`largestDeferredOffsetEverSeen` if we didn't want to log when the partition was 
last deferred.  It just tracks when the partition was last seen and the change 
at that offset was deferred rather than directly applied.  Once the partition 
is no longer deferred the value remains whatever it was and the boolean flips 
to `false`.
   
   It does seem on the surface that we could change the declaration to 
`deferredSinceOffset` and get rid of the boolean -- and `deferredSinceOffset` 
would change to `-1` once those changes are applied.  But there is a problem 
with this if the partition changes to not being deferred in the metadata cache 
before we ask `RaftReplicaManager` to process all of its deferred changes: the 
value will be -1 in the metadata cache under those circumstances, and we 
wouldn't have the value to log.
   
   So I think we have a few options.
   
   1. Do the logging, apply the changes to the matadata cache before replica 
manager, and keep the `Long` and `Boolean` as currently defined
   2. Do the logging, apply the changes to the matadata cache **after** replica 
manager, and use just a `Long` (with the semantics being changed as described 
above)
   3. Just use a Boolean and don't do the logging.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-05 Thread GitBox


rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571309365



##
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.log.LogManager
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpoints}
+import kafka.server.metadata.{ConfigRepository, MetadataBroker, 
MetadataBrokers, MetadataImageBuilder, MetadataPartition}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Map, Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+ metrics: Metrics,
+ time: Time,
+ scheduler: Scheduler,
+ logManager: LogManager,
+ isShuttingDown: AtomicBoolean,
+ quotaManagers: QuotaManagers,
+ brokerTopicStats: BrokerTopicStats,
+ metadataCache: MetadataCache,
+ logDirFailureChannel: LogDirFailureChannel,
+ delayedProducePurgatory: 
DelayedOperationPurgatory[DelayedProduce],
+ delayedFetchPurgatory: 
DelayedOperationPurgatory[DelayedFetch],
+ delayedDeleteRecordsPurgatory: 
DelayedOperationPurgatory[DelayedDeleteRecords],
+ delayedElectLeaderPurgatory: 
DelayedOperationPurgatory[DelayedElectLeader],
+ threadNamePrefix: Option[String],
+ configRepository: ConfigRepository,
+ alterIsrManager: AlterIsrManager) extends 
ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, 
quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, 
delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, 
threadNamePrefix, configRepository, alterIsrManager) {
+
+  if (config.requiresZookeeper) {
+throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} 
when using ZooKeeper")
+  }
+
+  // Changes are initially deferred when using a Raft-based metadata quorum, 
and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker 
(re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this 
indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for 
that partition.
+  private var deferringMetadataChanges: Boolean = true
+  stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+  def beginMetadataChangeDeferral(): Unit = {
+replicaStateChangeLock synchronized {
+  deferringMetadataChanges = true
+  stateChangeLogger.info(s"Metadata changes are now being deferred")
+}
+  }
+
+  def endMetadataChangeDeferral(): Unit = {
+val startMs = time.milliseconds()
+replicaStateChangeLock synchronized {
+  stateChangeLogger.info(s"Applying deferred metadata changes")
+  val highWatermarkCheckpoints = new 
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+  val partitionsMadeFollower = mutable.Set[Partition]()
+  val partitionsMadeLeader = mutable.Set[Partition]()
+  val leadershipChangeCallbacks =
+mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, 
(mutable.Set[Partition], mutable.Set[Partition])]()
+  try {
+val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+val followerPartitionStates = mutable.Map[Partition, 
MetadataPartition]()
+val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+val 

[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-05 Thread GitBox


rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571237871



##
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.log.LogManager
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpoints}
+import kafka.server.metadata.{ConfigRepository, MetadataBroker, 
MetadataBrokers, MetadataImageBuilder, MetadataPartition}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Map, Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+ metrics: Metrics,
+ time: Time,
+ scheduler: Scheduler,
+ logManager: LogManager,
+ isShuttingDown: AtomicBoolean,
+ quotaManagers: QuotaManagers,
+ brokerTopicStats: BrokerTopicStats,
+ metadataCache: MetadataCache,
+ logDirFailureChannel: LogDirFailureChannel,
+ delayedProducePurgatory: 
DelayedOperationPurgatory[DelayedProduce],
+ delayedFetchPurgatory: 
DelayedOperationPurgatory[DelayedFetch],
+ delayedDeleteRecordsPurgatory: 
DelayedOperationPurgatory[DelayedDeleteRecords],
+ delayedElectLeaderPurgatory: 
DelayedOperationPurgatory[DelayedElectLeader],
+ threadNamePrefix: Option[String],
+ configRepository: ConfigRepository,
+ alterIsrManager: AlterIsrManager) extends 
ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, 
quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, 
delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, 
threadNamePrefix, configRepository, alterIsrManager) {
+
+  if (config.requiresZookeeper) {
+throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} 
when using ZooKeeper")
+  }
+
+  // Changes are initially deferred when using a Raft-based metadata quorum, 
and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker 
(re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this 
indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for 
that partition.
+  private var deferringMetadataChanges: Boolean = true
+  stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+  def beginMetadataChangeDeferral(): Unit = {
+replicaStateChangeLock synchronized {
+  deferringMetadataChanges = true
+  stateChangeLogger.info(s"Metadata changes are now being deferred")
+}
+  }
+
+  def endMetadataChangeDeferral(): Unit = {
+val startMs = time.milliseconds()
+replicaStateChangeLock synchronized {
+  stateChangeLogger.info(s"Applying deferred metadata changes")
+  val highWatermarkCheckpoints = new 
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+  val partitionsMadeFollower = mutable.Set[Partition]()
+  val partitionsMadeLeader = mutable.Set[Partition]()
+  val leadershipChangeCallbacks =
+mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, 
(mutable.Set[Partition], mutable.Set[Partition])]()
+  try {
+val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+val followerPartitionStates = mutable.Map[Partition, 
MetadataPartition]()
+val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+val