[ https://issues.apache.org/jira/browse/SAMZA-948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15282226#comment-15282226 ]
Yi Pan (Data Infrastructure) commented on SAMZA-948: ---------------------------------------------------- Merged and submitted. Thanks! > CoordinatorStreamSystemConsumer is not threadsafe. > -------------------------------------------------- > > Key: SAMZA-948 > URL: https://issues.apache.org/jira/browse/SAMZA-948 > Project: Samza > Issue Type: Bug > Affects Versions: 0.10.1 > Reporter: Jake Maes > Assignee: Jake Maes > Priority: Blocker > Fix For: 0.10.1 > > Attachments: SAMZA-948_1.patch, SAMZA-948_2.patch, SAMZA-948_3.patch, > SAMZA-948_4.patch, SAMZA-948_5.patch, SAMZA-948_6.patch > > > In testing the 10.1 release I found some ConcurrentModificationExceptions > resulting from the SAMZA-913 patch. > It appears that the AM UI, onContainerComplete callback, and probably other > codepaths can cause concurrent coordinator stream bootstraps, which was > always a problem, but now causes a ConcurrentModificationException because > bootstrap calls remove() on the bootstrap messages set. > Here are a couple stack traces illustrating the issue: > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711) > at > java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:734) > at > org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.getBootstrappedStream(CoordinatorStreamSystemConsumer.java:188) > at > org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.getBootstrappedStream(AbstractCoordinatorStreamManager.java:85) > at > org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:101) > at > org.apache.samza.job.model.JobModel.getContainerToHostValue(JobModel.java:96) > at > org.apache.samza.job.yarn.SamzaTaskManager.onContainerCompleted(SamzaTaskManager.java:210) > at > org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$onContainersCompleted$1$$anonfun$apply$5.apply(SamzaAppMaster.scala:143) > at > org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$onContainersCompleted$1$$anonfun$apply$5.apply(SamzaAppMaster.scala:143) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$onContainersCompleted$1.apply(SamzaAppMaster.scala:143) > at > org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$onContainersCompleted$1.apply(SamzaAppMaster.scala:143) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.samza.job.yarn.SamzaAppMaster$.onContainersCompleted(SamzaAppMaster.scala:143) > at > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) > java.util.ConcurrentModificationException > at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:734) > at > org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.getBootstrappedStream(CoordinatorStreamSystemConsumer.java:188) > at > org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.getBootstrappedStream(AbstractCoordinatorStreamManager.java:85) > at > org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:101) > at > org.apache.samza.job.model.JobModel.getContainerToHostValue(JobModel.java:96) > at > views.$_scalate_$index_scaml$$anonfun$$_scalate_$render$3.apply(index.scaml.scala:230) > at > views.$_scalate_$index_scaml$$anonfun$$_scalate_$render$3.apply(index.scaml.scala:183) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at views.$_scalate_$index_scaml.render(index.scaml.scala:331) > at > org.fusesource.scalate.DefaultRenderContext.capture(DefaultRenderContext.scala:92) > at > org.fusesource.scalate.layout.DefaultLayoutStrategy.layout(DefaultLayoutStrategy.scala:45) > at > org.fusesource.scalate.TemplateEngine$$anonfun$layout$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(TemplateEngine.scala:559) > at > org.fusesource.scalate.TemplateEngine$$anonfun$layout$1$$anonfun$apply$mcV$sp$1.apply(TemplateEngine.scala:559) > at > org.fusesource.scalate.TemplateEngine$$anonfun$layout$1$$anonfun$apply$mcV$sp$1.apply(TemplateEngine.scala:559) > at org.fusesource.scalate.RenderContext$class.withUri(RenderContext.scala:447) > at > org.fusesource.scalate.DefaultRenderContext.withUri(DefaultRenderContext.scala:30) > at > org.fusesource.scalate.TemplateEngine$$anonfun$layout$1.apply$mcV$sp(TemplateEngine.scala:558) > at > org.fusesource.scalate.TemplateEngine$$anonfun$layout$1.apply(TemplateEngine.scala:555) > at > org.fusesource.scalate.TemplateEngine$$anonfun$layout$1.apply(TemplateEngine.scala:555) > at org.fusesource.scalate.RenderContext$.using(RenderContext.scala:47) > at org.fusesource.scalate.TemplateEngine.layout(TemplateEngine.scala:555) > at org.fusesource.scalate.TemplateEngine.layout(TemplateEngine.scala:547) > at org.fusesource.scalate.TemplateEngine.layout(TemplateEngine.scala:601) > at > org.scalatra.scalate.ScalateSupport$class.layoutTemplateAs(ScalateSupport.scala:223) > at > org.apache.samza.webapp.ApplicationMasterWebServlet.layoutTemplateAs(ApplicationMasterWebServlet.scala:31) > ... [truncated] -- This message was sent by Atlassian JIRA (v6.3.4#6332)