[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523970#comment-16523970
 ] 

ASF GitHub Bot commented on KAFKA-6949:
---

lindong28 closed pull request #5293: KAFKA-6949; alterReplicaLogDirs() should 
grab partition lock when accessing log of the future replica
URL: https://github.com/apache/kafka/pull/5293
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 9ab1ec47af8..b80c34475d3 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -149,10 +149,10 @@ class Partition(val topic: String,
 * @return true iff the future replica is created
 */
   def maybeCreateFutureReplica(logDir: String): Boolean = {
-// The readLock is needed to make sure that while the caller checks the 
log directory of the
+// The writeLock is needed to make sure that while the caller checks the 
log directory of the
 // current replica and the existence of the future replica, no other 
thread can update the log directory of the
 // current replica or remove the future replica.
-inReadLock(leaderIsrUpdateLock) {
+inWriteLock(leaderIsrUpdateLock) {
   val currentReplica = getReplica().get
   if (currentReplica.log.get.dir.getParent == logDir)
 false
@@ -207,29 +207,52 @@ class Partition(val topic: String,
 allReplicasMap.remove(replicaId)
   }
 
-  def removeFutureLocalReplica() {
+  def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
+inReadLock(leaderIsrUpdateLock) {
+  getReplica(Request.FutureLocalReplicaId) match {
+case Some(futureReplica) =>
+  if (futureReplica.log.get.dir.getParent != newDestinationDir)
+true
+  else
+false
+case None => false
+  }
+}
+  }
+
+  def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) {
 inWriteLock(leaderIsrUpdateLock) {
   allReplicasMap.remove(Request.FutureLocalReplicaId)
+  if (deleteFromLogDir)
+logManager.asyncDelete(topicPartition, isFuture = true)
 }
   }
 
-  // Return true iff the future log has caught up with the current log for 
this partition
+  // Return true iff the future replica exists and it has caught up with the 
current replica for this partition
   // Only ReplicaAlterDirThread will call this method and 
ReplicaAlterDirThread should remove the partition
   // from its partitionStates if this method returns true
   def maybeReplaceCurrentWithFutureReplica(): Boolean = {
 val replica = getReplica().get
-val futureReplica = getReplica(Request.FutureLocalReplicaId).get
-if (replica.logEndOffset == futureReplica.logEndOffset) {
+val futureReplicaLEO = 
getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset)
+if (futureReplicaLEO.contains(replica.logEndOffset)) {
   // The write lock is needed to make sure that while 
ReplicaAlterDirThread checks the LEO of the
   // current replica, no other thread can update LEO of the current 
replica via log truncation or log append operation.
   inWriteLock(leaderIsrUpdateLock) {
-if (replica.logEndOffset == futureReplica.logEndOffset) {
-  logManager.replaceCurrentWithFutureLog(topicPartition)
-  replica.log = futureReplica.log
-  futureReplica.log = None
-  allReplicasMap.remove(Request.FutureLocalReplicaId)
-  true
-} else false
+getReplica(Request.FutureLocalReplicaId) match {
+  case Some(futureReplica) =>
+if (replica.logEndOffset == futureReplica.logEndOffset) {
+  logManager.replaceCurrentWithFutureLog(topicPartition)
+  replica.log = futureReplica.log
+  futureReplica.log = None
+  allReplicasMap.remove(Request.FutureLocalReplicaId)
+  true
+} else false
+  case None =>
+// Future replica is removed by a non-ReplicaAlterLogDirsThread 
before this method is called
+// In this case the partition should have been removed from state 
of the ReplicaAlterLogDirsThread
+// Return false so that ReplicaAlterLogDirsThread does not have to 
remove this partition from the state again to avoid race condition
+false
+}
   }
 } else false
   }
@@ -550,15 +573,22 @@ class Partition(val topic: String,
   }
 
   private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, 
isFuture: Boolean): Unit = {
-  if (isFuture)
-
getReplicaOrException(Request.Futur

[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

2018-06-25 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523297#comment-16523297
 ] 

Dong Lin commented on KAFKA-6949:
-

[~rsivaram] The patch has been merged into trunk. I opened PR 
[https://github.com/apache/kafka/pull/5293] to merge it into 2.0. Typically do 
we need review from another committer in order to cherry-pick patch from trunk 
into a release branch?

Also, I added fix version 2.0.0 in this JIRA. Please feel free to change it as 
you see appropriate. Thanks.

 

> alterReplicaLogDirs() should grab partition lock when accessing log of the 
> future replica
> -
>
> Key: KAFKA-6949
> URL: https://issues.apache.org/jira/browse/KAFKA-6949
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dong Lin
>Priority: Blocker
> Fix For: 2.0.0
>
>
> I found this in a failed execution of 
> kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like 
> we're missing some option checking.
> {code}
> [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while 
> changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76)
> java.util.NoSuchElementException: None.get
>   at scala.None$.get(Option.scala:347)
>   at scala.None$.get(Option.scala:345)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576)
>   at 
> kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:138)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523295#comment-16523295
 ] 

ASF GitHub Bot commented on KAFKA-6949:
---

lindong28 opened a new pull request #5293: KAFKA-6949; alterReplicaLogDirs() 
should grab partition lock when accessing log of the future replica
URL: https://github.com/apache/kafka/pull/5293
 
 
   NoSuchElementException will be thrown if ReplicaAlterDirThread replaces the 
current replica with future replica right before the request handler thread 
executes `futureReplica.log.get.dir.getParent` in the 
ReplicaManager.alterReplicaLogDirs(). The solution is to grab the partition 
lock when request handler thread attempts to check the destination log 
directory of the future replica.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> alterReplicaLogDirs() should grab partition lock when accessing log of the 
> future replica
> -
>
> Key: KAFKA-6949
> URL: https://issues.apache.org/jira/browse/KAFKA-6949
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dong Lin
>Priority: Blocker
> Fix For: 2.0.0
>
>
> I found this in a failed execution of 
> kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like 
> we're missing some option checking.
> {code}
> [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while 
> changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76)
> java.util.NoSuchElementException: None.get
>   at scala.None$.get(Option.scala:347)
>   at scala.None$.get(Option.scala:345)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576)
>   at 
> kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:138)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523292#comment-16523292
 ] 

ASF GitHub Bot commented on KAFKA-6949:
---

lindong28 closed pull request #5081: KAFKA-6949; alterReplicaLogDirs() should 
grab partition lock when accessing log of the future replica
URL: https://github.com/apache/kafka/pull/5081
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 9ab1ec47af8..b80c34475d3 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -149,10 +149,10 @@ class Partition(val topic: String,
 * @return true iff the future replica is created
 */
   def maybeCreateFutureReplica(logDir: String): Boolean = {
-// The readLock is needed to make sure that while the caller checks the 
log directory of the
+// The writeLock is needed to make sure that while the caller checks the 
log directory of the
 // current replica and the existence of the future replica, no other 
thread can update the log directory of the
 // current replica or remove the future replica.
-inReadLock(leaderIsrUpdateLock) {
+inWriteLock(leaderIsrUpdateLock) {
   val currentReplica = getReplica().get
   if (currentReplica.log.get.dir.getParent == logDir)
 false
@@ -207,29 +207,52 @@ class Partition(val topic: String,
 allReplicasMap.remove(replicaId)
   }
 
-  def removeFutureLocalReplica() {
+  def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
+inReadLock(leaderIsrUpdateLock) {
+  getReplica(Request.FutureLocalReplicaId) match {
+case Some(futureReplica) =>
+  if (futureReplica.log.get.dir.getParent != newDestinationDir)
+true
+  else
+false
+case None => false
+  }
+}
+  }
+
+  def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) {
 inWriteLock(leaderIsrUpdateLock) {
   allReplicasMap.remove(Request.FutureLocalReplicaId)
+  if (deleteFromLogDir)
+logManager.asyncDelete(topicPartition, isFuture = true)
 }
   }
 
-  // Return true iff the future log has caught up with the current log for 
this partition
+  // Return true iff the future replica exists and it has caught up with the 
current replica for this partition
   // Only ReplicaAlterDirThread will call this method and 
ReplicaAlterDirThread should remove the partition
   // from its partitionStates if this method returns true
   def maybeReplaceCurrentWithFutureReplica(): Boolean = {
 val replica = getReplica().get
-val futureReplica = getReplica(Request.FutureLocalReplicaId).get
-if (replica.logEndOffset == futureReplica.logEndOffset) {
+val futureReplicaLEO = 
getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset)
+if (futureReplicaLEO.contains(replica.logEndOffset)) {
   // The write lock is needed to make sure that while 
ReplicaAlterDirThread checks the LEO of the
   // current replica, no other thread can update LEO of the current 
replica via log truncation or log append operation.
   inWriteLock(leaderIsrUpdateLock) {
-if (replica.logEndOffset == futureReplica.logEndOffset) {
-  logManager.replaceCurrentWithFutureLog(topicPartition)
-  replica.log = futureReplica.log
-  futureReplica.log = None
-  allReplicasMap.remove(Request.FutureLocalReplicaId)
-  true
-} else false
+getReplica(Request.FutureLocalReplicaId) match {
+  case Some(futureReplica) =>
+if (replica.logEndOffset == futureReplica.logEndOffset) {
+  logManager.replaceCurrentWithFutureLog(topicPartition)
+  replica.log = futureReplica.log
+  futureReplica.log = None
+  allReplicasMap.remove(Request.FutureLocalReplicaId)
+  true
+} else false
+  case None =>
+// Future replica is removed by a non-ReplicaAlterLogDirsThread 
before this method is called
+// In this case the partition should have been removed from state 
of the ReplicaAlterLogDirsThread
+// Return false so that ReplicaAlterLogDirsThread does not have to 
remove this partition from the state again to avoid race condition
+false
+}
   }
 } else false
   }
@@ -550,15 +573,22 @@ class Partition(val topic: String,
   }
 
   private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, 
isFuture: Boolean): Unit = {
-  if (isFuture)
-
getReplicaOrException(Request.Futur

[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

2018-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491438#comment-16491438
 ] 

ASF GitHub Bot commented on KAFKA-6949:
---

lindong28 opened a new pull request #5081: KAFKA-6949; alterReplicaLogDirs() 
should grab partition lock when accessing log of the future replica
URL: https://github.com/apache/kafka/pull/5081
 
 
   NoSuchElementException will be thrown if ReplicaAlterDirThread replaces the 
current replica with future replica right before the request handler thread 
executes `futureReplica.log.get.dir.getParent` in the 
ReplicaManager.alterReplicaLogDirs(). The solution is to grab the partition 
lock when request handler thread attempts to check the destination log 
directory of the future replica.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> alterReplicaLogDirs() should grab partition lock when accessing log of the 
> future replica
> -
>
> Key: KAFKA-6949
> URL: https://issues.apache.org/jira/browse/KAFKA-6949
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dong Lin
>Priority: Major
>
> I found this in a failed execution of 
> kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like 
> we're missing some option checking.
> {code}
> [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while 
> changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76)
> java.util.NoSuchElementException: None.get
>   at scala.None$.get(Option.scala:347)
>   at scala.None$.get(Option.scala:345)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576)
>   at 
> kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:138)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)