[jira] [Commented] (KAFKA-6082) consider fencing zookeeper updates with controller epoch zkVersion

2018-09-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16607679#comment-16607679
 ] 

ASF GitHub Bot commented on KAFKA-6082:
---

lindong28 closed pull request #5101: KAFKA-6082: Fence zookeeper updates with 
controller epoch zkVersion
URL: https://github.com/apache/kafka/pull/5101
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index a92340f2a4b..f3c81efb0b5 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -70,7 +70,7 @@ class Partition(val topic: String,
* the controller sends it a start replica command containing the leader for 
each partition that the broker hosts.
* In addition to the leader, the controller can also send the epoch of the 
controller that elected the leader for
* each partition. */
-  private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
+  private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
   this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
 
   private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == 
localBrokerId || replicaId == Request.FutureLocalReplicaId
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala 
b/core/src/main/scala/kafka/controller/ControllerContext.scala
index f4671cfaa1a..20c3de0c1e5 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -28,8 +28,8 @@ class ControllerContext {
   var controllerChannelManager: ControllerChannelManager = null
 
   var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
-  var epoch: Int = KafkaController.InitialControllerEpoch - 1
-  var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
+  var epoch: Int = KafkaController.InitialControllerEpoch
+  var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
   var allTopics: Set[String] = Set.empty
   private var partitionReplicaAssignmentUnderlying: mutable.Map[String, 
mutable.Map[Int, Seq[Int]]] = mutable.Map.empty
   val partitionLeadershipInfo: mutable.Map[TopicPartition, 
LeaderIsrAndControllerEpoch] = mutable.Map.empty
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala 
b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index 13967e029ed..c93e9e79ec2 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -24,6 +24,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.ShutdownableThread
+import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.utils.Time
 
 import scala.collection._
@@ -32,12 +33,14 @@ object ControllerEventManager {
   val ControllerEventThreadName = "controller-event-thread"
 }
 class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: 
Map[ControllerState, KafkaTimer],
- eventProcessedListener: ControllerEvent => Unit) 
extends KafkaMetricsGroup {
+ eventProcessedListener: ControllerEvent => Unit,
+ controllerMovedListener: () => Unit) extends 
KafkaMetricsGroup {
 
   @volatile private var _state: ControllerState = ControllerState.Idle
   private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[ControllerEvent]
-  private val thread = new 
ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
+  // Visible for test
+  private[controller] val thread = new 
ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
   private val time = Time.SYSTEM
 
   private val eventQueueTimeHist = newHistogram("EventQueueTimeMs")
@@ -86,6 +89,9 @@ class ControllerEventManager(controllerId: Int, 
rateAndTimeMetrics: Map[Controll
   controllerEvent.process()
 }
   } catch {
+case e: ControllerMovedException =>
+  info(s"Controller moved to another broker when processing 
$controllerEvent.", e)
+  controllerMovedListener()
 case e: Throwable => error(s"Error processing event 
$controllerEvent", e)
   }
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 286768f595b..379e66da9e0 100644
--- a/

[jira] [Commented] (KAFKA-6082) consider fencing zookeeper updates with controller epoch zkVersion

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495880#comment-16495880
 ] 

ASF GitHub Bot commented on KAFKA-6082:
---

hzxa21 opened a new pull request #5101: KAFKA-6082: Fence zookeeper updates 
with controller epoch zkVersion
URL: https://github.com/apache/kafka/pull/5101
 
 
   This PR aims to enforce the controller can only update zookeeper states 
after checking the controller epoch zkVersion. The check and zookeeper state 
updates are wrapped in the zookeeper multi() operations to ensure that they are 
done atomically. This PR is necessary to resolve issues related to multiple 
controllers (i.e. old controller updates zookeeper states before resignation, 
which is possible during controller failover based on the single threaded event 
queue model we have)
   
   This PR includes the following changes:
   - Add MultiOp request and response in ZookeeperClient
   - Ensure all zookeeper updates done by controller are protected by checking 
the current controller epoch zkVersion
   - Modify test cases in KafkaZkClientTest to test mismatch controller epoch 
zkVersion
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> consider fencing zookeeper updates with controller epoch zkVersion
> --
>
> Key: KAFKA-6082
> URL: https://issues.apache.org/jira/browse/KAFKA-6082
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Priority: Major
>
> If we want, we can use multi-op to fence zookeeper updates with the 
> controller epoch's zkVersion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6082) consider fencing zookeeper updates with controller epoch zkVersion

2017-12-17 Thread Prasanna Gautam (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16294391#comment-16294391
 ] 

Prasanna Gautam commented on KAFKA-6082:


[~onurkaraman] Does the require fencing all ZK updates from controller and 
brokers, or some subset of changes?

> consider fencing zookeeper updates with controller epoch zkVersion
> --
>
> Key: KAFKA-6082
> URL: https://issues.apache.org/jira/browse/KAFKA-6082
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>
> If we want, we can use multi-op to fence zookeeper updates with the 
> controller epoch's zkVersion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)