[ 
https://issues.apache.org/jira/browse/SAMZA-947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15281601#comment-15281601
 ] 

Yi Pan (Data Infrastructure) commented on SAMZA-947:
----------------------------------------------------

+1. Merged and submitted. Thanks!

> TaskAssignmentManager registration exception when partition count changes.
> --------------------------------------------------------------------------
>
>                 Key: SAMZA-947
>                 URL: https://issues.apache.org/jira/browse/SAMZA-947
>             Project: Samza
>          Issue Type: Bug
>    Affects Versions: 0.10.1
>            Reporter: Jake Maes
>            Assignee: Jake Maes
>            Priority: Minor
>             Fix For: 0.10.1
>
>         Attachments: SAMZA-947_1.patch, SAMZA-947_2.patch
>
>
> The GroupByPartitionCount grouper deletes the persisted task mapping if the 
> partition count has changed because there may be fewer tasks and that would 
> cause the old mapping to be invalid. 
> To delete the mapping, the TaskAssignmentManager registers itself and writes 
> null for all the keys. Later when the recalculated mapping is saved, it tries 
> to reregister itself, which causes this exception:
> Exception in thread "main" org.apache.samza.SamzaException: 
> SamzaTaskAssignmentManager is already registered with the queuing system 
> producer
>       at 
> org.apache.samza.system.kafka.KafkaSystemProducer.register(KafkaSystemProducer.scala:65)
>       at 
> org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.register(CoordinatorStreamSystemProducer.java:72)
>       at 
> org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.registerCoordinatorStreamProducer(AbstractCoordinatorStreamManager.java:100)
>       at 
> org.apache.samza.container.grouper.task.TaskAssignmentManager.register(TaskAssignmentManager.java:58)
>       at 
> org.apache.samza.container.grouper.task.GroupByContainerCount.saveTaskAssignments(GroupByContainerCount.java:179)
>       at 
> org.apache.samza.container.grouper.task.GroupByContainerCount.balance(GroupByContainerCount.java:93)
>       at 
> org.apache.samza.coordinator.JobCoordinator$.refreshJobModel(JobCoordinator.scala:255)
>       at 
> org.apache.samza.coordinator.JobCoordinator$.jobModelGenerator$1(JobCoordinator.scala:187)
>       at 
> org.apache.samza.coordinator.JobCoordinator$.initializeJobModel(JobCoordinator.scala:193)
>       at 
> org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:120)
>       at 
> org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:104)
>       at 
> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:74)
>       at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> In a YARN environment, the AM restarts and since the task mapping has now 
> been deleted, this 2nd attempt to save the mapping succeeds. 
> Since this issue only occurs when the partition count changes and is 
> recoverable, I'm marking it as low priority.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to