> On May 11, 2016, 8:07 p.m., Yi Pan (Data Infrastructure) wrote:
> > It seems that all we needed to fix the issue is to remove the line to 
> > register the coordinator stream consumer? Did I miss any other things?
> 
> Jake Maes wrote:
>     The key change is to remove the register() call from 
> GroupByContainerCount.save....() and move it to the constructor. 
>     
>     It was problematic because when the partition count would change, it 
> deletes the existing task mapping, which seems to register the producer, then 
> when it recalculates and saves the new mapping, it tries to register again 
> and this causes the registration exception. 
>     
>     To me, it's just flimsy to pass (and REUSE!!!) 
> CoordinatorStreamSystemProducer and CoordinatorStreamSystemConsumer instances 
> into various implementations of AbstractCoordinatorStreamManager and expect 
> each of them to register the same producer and consumer, especially when each 
> implementation (after the first) that registers the consumer has no effect. 
> So, now the TaskAssignmentManager registers itself with the producer 
> immediately and only once. It doesn't bother registering for consumer, since 
> that's done first thing in the JobCoordinator.

Agreed. That remind me of some old memory on this issue. The better approach is 
to have a service maintaining the whole lifecycle of coordinator stream 
consumer and producer and simply provide the consumer/producer to the 
coordinator stream managers. Let's do the refactor later.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47247/#review132737
-----------------------------------------------------------


On May 11, 2016, 8:26 p.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47247/
> -----------------------------------------------------------
> 
> (Updated May 11, 2016, 8:26 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-947
>     https://issues.apache.org/jira/browse/SAMZA-947
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-947 - TaskAssignmentManager registration exception when partition count 
> changes.
> 
> * Register the producer once at constructor time
> * Don't bother registering the consumer. It's lifecycle is outside the 
> TaskAssignmentManager and registering from the TAM ends up being a no-op.
> 
> 
> Diffs
> -----
> 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
>  286ea1b3d0d39ebd7d9923a81c02c1d0842b1291 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
>  0cbdec8ac050de18c2fea191e3ef38273f1dbab1 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
>  19ab78e891ca22b6fba430ded6b9382c860a212d 
> 
> Diff: https://reviews.apache.org/r/47247/diff/
> 
> 
> Testing
> -------
> 
> Manually tested with a job. Adjusted the input topics to induce a partition 
> change and verified no exceptions and the TaskAssignmentManager cleanup ran 
> appropriately.
> 
> 2016-05-11 17:34:33 GroupByContainerCount [WARN] Current task count 32 does 
> not match saved task count 512. Stateful jobs may observe misalignment of 
> keys!
> ...
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 15" moved 
> from container 57 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 14" moved 
> from container 46 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 13" moved 
> from container 35 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 19" moved 
> from container 101 to container null
> ...
> 
> 
> Thanks,
> 
> Jake Maes
> 
>

Reply via email to