> On May 11, 2016, 8:07 p.m., Yi Pan (Data Infrastructure) wrote: > > It seems that all we needed to fix the issue is to remove the line to > > register the coordinator stream consumer? Did I miss any other things? > > Jake Maes wrote: > The key change is to remove the register() call from > GroupByContainerCount.save....() and move it to the constructor. > > It was problematic because when the partition count would change, it > deletes the existing task mapping, which seems to register the producer, then > when it recalculates and saves the new mapping, it tries to register again > and this causes the registration exception. > > To me, it's just flimsy to pass (and REUSE!!!) > CoordinatorStreamSystemProducer and CoordinatorStreamSystemConsumer instances > into various implementations of AbstractCoordinatorStreamManager and expect > each of them to register the same producer and consumer, especially when each > implementation (after the first) that registers the consumer has no effect. > So, now the TaskAssignmentManager registers itself with the producer > immediately and only once. It doesn't bother registering for consumer, since > that's done first thing in the JobCoordinator.
Agreed. That remind me of some old memory on this issue. The better approach is to have a service maintaining the whole lifecycle of coordinator stream consumer and producer and simply provide the consumer/producer to the coordinator stream managers. Let's do the refactor later. - Yi ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/47247/#review132737 ----------------------------------------------------------- On May 11, 2016, 8:26 p.m., Jake Maes wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/47247/ > ----------------------------------------------------------- > > (Updated May 11, 2016, 8:26 p.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina > Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure). > > > Bugs: SAMZA-947 > https://issues.apache.org/jira/browse/SAMZA-947 > > > Repository: samza > > > Description > ------- > > SAMZA-947 - TaskAssignmentManager registration exception when partition count > changes. > > * Register the producer once at constructor time > * Don't bother registering the consumer. It's lifecycle is outside the > TaskAssignmentManager and registering from the TAM ends up being a no-op. > > > Diffs > ----- > > > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java > 286ea1b3d0d39ebd7d9923a81c02c1d0842b1291 > > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java > 0cbdec8ac050de18c2fea191e3ef38273f1dbab1 > > samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java > 19ab78e891ca22b6fba430ded6b9382c860a212d > > Diff: https://reviews.apache.org/r/47247/diff/ > > > Testing > ------- > > Manually tested with a job. Adjusted the input topics to induce a partition > change and verified no exceptions and the TaskAssignmentManager cleanup ran > appropriately. > > 2016-05-11 17:34:33 GroupByContainerCount [WARN] Current task count 32 does > not match saved task count 512. Stateful jobs may observe misalignment of > keys! > ... > 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 15" moved > from container 57 to container null > 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 14" moved > from container 46 to container null > 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 13" moved > from container 35 to container null > 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 19" moved > from container 101 to container null > ... > > > Thanks, > > Jake Maes > >