[ https://issues.apache.org/jira/browse/SAMZA-947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15281601#comment-15281601 ]
Yi Pan (Data Infrastructure) commented on SAMZA-947: ---------------------------------------------------- +1. Merged and submitted. Thanks! > TaskAssignmentManager registration exception when partition count changes. > -------------------------------------------------------------------------- > > Key: SAMZA-947 > URL: https://issues.apache.org/jira/browse/SAMZA-947 > Project: Samza > Issue Type: Bug > Affects Versions: 0.10.1 > Reporter: Jake Maes > Assignee: Jake Maes > Priority: Minor > Fix For: 0.10.1 > > Attachments: SAMZA-947_1.patch, SAMZA-947_2.patch > > > The GroupByPartitionCount grouper deletes the persisted task mapping if the > partition count has changed because there may be fewer tasks and that would > cause the old mapping to be invalid. > To delete the mapping, the TaskAssignmentManager registers itself and writes > null for all the keys. Later when the recalculated mapping is saved, it tries > to reregister itself, which causes this exception: > Exception in thread "main" org.apache.samza.SamzaException: > SamzaTaskAssignmentManager is already registered with the queuing system > producer > at > org.apache.samza.system.kafka.KafkaSystemProducer.register(KafkaSystemProducer.scala:65) > at > org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.register(CoordinatorStreamSystemProducer.java:72) > at > org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.registerCoordinatorStreamProducer(AbstractCoordinatorStreamManager.java:100) > at > org.apache.samza.container.grouper.task.TaskAssignmentManager.register(TaskAssignmentManager.java:58) > at > org.apache.samza.container.grouper.task.GroupByContainerCount.saveTaskAssignments(GroupByContainerCount.java:179) > at > org.apache.samza.container.grouper.task.GroupByContainerCount.balance(GroupByContainerCount.java:93) > at > org.apache.samza.coordinator.JobCoordinator$.refreshJobModel(JobCoordinator.scala:255) > at > org.apache.samza.coordinator.JobCoordinator$.jobModelGenerator$1(JobCoordinator.scala:187) > at > org.apache.samza.coordinator.JobCoordinator$.initializeJobModel(JobCoordinator.scala:193) > at > org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:120) > at > org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:104) > at > org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:74) > at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) > In a YARN environment, the AM restarts and since the task mapping has now > been deleted, this 2nd attempt to save the mapping succeeds. > Since this issue only occurs when the partition count changes and is > recoverable, I'm marking it as low priority. -- This message was sent by Atlassian JIRA (v6.3.4#6332)