Re: Review Request 47247: SAMZA-947 - TaskAssignmentManager registration exception when partition count changes.

2016-05-11 Thread Jake Maes


> On May 11, 2016, 9:18 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java,
> >  line 42
> > 
> >
> > I guess that currently TaskAssignmentManager is still accessed via 
> > single thread? It is fine for now. But we should evaluate the need to make 
> > this thread safe when integrating with Xinyu's change.

TaskAssignmentManager should only be used in the JobCoordinator to 
refreshJobModel. So unless we have multiple threads refreshing the job model, 
it should be fine. And if we do, we should consider whether the job model 
refresh should be synchronous.


- Jake


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47247/#review132749
---


On May 11, 2016, 8:26 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47247/
> ---
> 
> (Updated May 11, 2016, 8:26 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-947
> https://issues.apache.org/jira/browse/SAMZA-947
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-947 - TaskAssignmentManager registration exception when partition count 
> changes.
> 
> * Register the producer once at constructor time
> * Don't bother registering the consumer. It's lifecycle is outside the 
> TaskAssignmentManager and registering from the TAM ends up being a no-op.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
>  286ea1b3d0d39ebd7d9923a81c02c1d0842b1291 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
>  0cbdec8ac050de18c2fea191e3ef38273f1dbab1 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
>  19ab78e891ca22b6fba430ded6b9382c860a212d 
> 
> Diff: https://reviews.apache.org/r/47247/diff/
> 
> 
> Testing
> ---
> 
> Manually tested with a job. Adjusted the input topics to induce a partition 
> change and verified no exceptions and the TaskAssignmentManager cleanup ran 
> appropriately.
> 
> 2016-05-11 17:34:33 GroupByContainerCount [WARN] Current task count 32 does 
> not match saved task count 512. Stateful jobs may observe misalignment of 
> keys!
> ...
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 15" moved 
> from container 57 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 14" moved 
> from container 46 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 13" moved 
> from container 35 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 19" moved 
> from container 101 to container null
> ...
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Re: Review Request 47247: SAMZA-947 - TaskAssignmentManager registration exception when partition count changes.

2016-05-11 Thread Yi Pan (Data Infrastructure)

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47247/#review132749
---


Ship it!




lgtm. +1.


samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
 (line 42)


I guess that currently TaskAssignmentManager is still accessed via single 
thread? It is fine for now. But we should evaluate the need to make this thread 
safe when integrating with Xinyu's change.


- Yi Pan (Data Infrastructure)


On May 11, 2016, 8:26 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47247/
> ---
> 
> (Updated May 11, 2016, 8:26 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-947
> https://issues.apache.org/jira/browse/SAMZA-947
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-947 - TaskAssignmentManager registration exception when partition count 
> changes.
> 
> * Register the producer once at constructor time
> * Don't bother registering the consumer. It's lifecycle is outside the 
> TaskAssignmentManager and registering from the TAM ends up being a no-op.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
>  286ea1b3d0d39ebd7d9923a81c02c1d0842b1291 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
>  0cbdec8ac050de18c2fea191e3ef38273f1dbab1 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
>  19ab78e891ca22b6fba430ded6b9382c860a212d 
> 
> Diff: https://reviews.apache.org/r/47247/diff/
> 
> 
> Testing
> ---
> 
> Manually tested with a job. Adjusted the input topics to induce a partition 
> change and verified no exceptions and the TaskAssignmentManager cleanup ran 
> appropriately.
> 
> 2016-05-11 17:34:33 GroupByContainerCount [WARN] Current task count 32 does 
> not match saved task count 512. Stateful jobs may observe misalignment of 
> keys!
> ...
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 15" moved 
> from container 57 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 14" moved 
> from container 46 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 13" moved 
> from container 35 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 19" moved 
> from container 101 to container null
> ...
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Re: Review Request 47247: SAMZA-947 - TaskAssignmentManager registration exception when partition count changes.

2016-05-11 Thread Yi Pan (Data Infrastructure)


> On May 11, 2016, 8:07 p.m., Yi Pan (Data Infrastructure) wrote:
> > It seems that all we needed to fix the issue is to remove the line to 
> > register the coordinator stream consumer? Did I miss any other things?
> 
> Jake Maes wrote:
> The key change is to remove the register() call from 
> GroupByContainerCount.save() and move it to the constructor. 
> 
> It was problematic because when the partition count would change, it 
> deletes the existing task mapping, which seems to register the producer, then 
> when it recalculates and saves the new mapping, it tries to register again 
> and this causes the registration exception. 
> 
> To me, it's just flimsy to pass (and REUSE!!!) 
> CoordinatorStreamSystemProducer and CoordinatorStreamSystemConsumer instances 
> into various implementations of AbstractCoordinatorStreamManager and expect 
> each of them to register the same producer and consumer, especially when each 
> implementation (after the first) that registers the consumer has no effect. 
> So, now the TaskAssignmentManager registers itself with the producer 
> immediately and only once. It doesn't bother registering for consumer, since 
> that's done first thing in the JobCoordinator.

Agreed. That remind me of some old memory on this issue. The better approach is 
to have a service maintaining the whole lifecycle of coordinator stream 
consumer and producer and simply provide the consumer/producer to the 
coordinator stream managers. Let's do the refactor later.


- Yi


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47247/#review132737
---


On May 11, 2016, 8:26 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47247/
> ---
> 
> (Updated May 11, 2016, 8:26 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-947
> https://issues.apache.org/jira/browse/SAMZA-947
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-947 - TaskAssignmentManager registration exception when partition count 
> changes.
> 
> * Register the producer once at constructor time
> * Don't bother registering the consumer. It's lifecycle is outside the 
> TaskAssignmentManager and registering from the TAM ends up being a no-op.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
>  286ea1b3d0d39ebd7d9923a81c02c1d0842b1291 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
>  0cbdec8ac050de18c2fea191e3ef38273f1dbab1 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
>  19ab78e891ca22b6fba430ded6b9382c860a212d 
> 
> Diff: https://reviews.apache.org/r/47247/diff/
> 
> 
> Testing
> ---
> 
> Manually tested with a job. Adjusted the input topics to induce a partition 
> change and verified no exceptions and the TaskAssignmentManager cleanup ran 
> appropriately.
> 
> 2016-05-11 17:34:33 GroupByContainerCount [WARN] Current task count 32 does 
> not match saved task count 512. Stateful jobs may observe misalignment of 
> keys!
> ...
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 15" moved 
> from container 57 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 14" moved 
> from container 46 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 13" moved 
> from container 35 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 19" moved 
> from container 101 to container null
> ...
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Re: Review Request 47247: SAMZA-947 - TaskAssignmentManager registration exception when partition count changes.

2016-05-11 Thread Jake Maes


> On May 11, 2016, 8:07 p.m., Yi Pan (Data Infrastructure) wrote:
> > It seems that all we needed to fix the issue is to remove the line to 
> > register the coordinator stream consumer? Did I miss any other things?

The key change is to remove the register() call from 
GroupByContainerCount.save() and move it to the constructor. 

It was problematic because when the partition count would change, it deletes 
the existing task mapping, which seems to register the producer, then when it 
recalculates and saves the new mapping, it tries to register again and this 
causes the registration exception. 

To me, it's just flimsy to pass (and REUSE!!!) CoordinatorStreamSystemProducer 
and CoordinatorStreamSystemConsumer instances into various implementations of 
AbstractCoordinatorStreamManager and expect each of them to register the same 
producer and consumer, especially when each implementation (after the first) 
that registers the consumer has no effect. So, now the TaskAssignmentManager 
registers itself with the producer immediately and only once. It doesn't bother 
registering for consumer, since that's done first thing in the JobCoordinator.


> On May 11, 2016, 8:07 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java,
> >  line 53
> > 
> >
> > nit: Is this to avoid the call to register() with null input argument? 
> > It does not seem to be absolutely needed and makes this 
> > CoordinatorStreamManager's lifecycle different from others.

Explained above


- Jake


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47247/#review132737
---


On May 11, 2016, 8:26 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47247/
> ---
> 
> (Updated May 11, 2016, 8:26 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-947
> https://issues.apache.org/jira/browse/SAMZA-947
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-947 - TaskAssignmentManager registration exception when partition count 
> changes.
> 
> * Register the producer once at constructor time
> * Don't bother registering the consumer. It's lifecycle is outside the 
> TaskAssignmentManager and registering from the TAM ends up being a no-op.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
>  286ea1b3d0d39ebd7d9923a81c02c1d0842b1291 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
>  0cbdec8ac050de18c2fea191e3ef38273f1dbab1 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
>  19ab78e891ca22b6fba430ded6b9382c860a212d 
> 
> Diff: https://reviews.apache.org/r/47247/diff/
> 
> 
> Testing
> ---
> 
> Manually tested with a job. Adjusted the input topics to induce a partition 
> change and verified no exceptions and the TaskAssignmentManager cleanup ran 
> appropriately.
> 
> 2016-05-11 17:34:33 GroupByContainerCount [WARN] Current task count 32 does 
> not match saved task count 512. Stateful jobs may observe misalignment of 
> keys!
> ...
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 15" moved 
> from container 57 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 14" moved 
> from container 46 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 13" moved 
> from container 35 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 19" moved 
> from container 101 to container null
> ...
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Re: Review Request 47247: SAMZA-947 - TaskAssignmentManager registration exception when partition count changes.

2016-05-11 Thread Jake Maes

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47247/
---

(Updated May 11, 2016, 8:26 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-947
https://issues.apache.org/jira/browse/SAMZA-947


Repository: samza


Description
---

SAMZA-947 - TaskAssignmentManager registration exception when partition count 
changes.

* Register the producer once at constructor time
* Don't bother registering the consumer. It's lifecycle is outside the 
TaskAssignmentManager and registering from the TAM ends up being a no-op.


Diffs (updated)
-

  
samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
 286ea1b3d0d39ebd7d9923a81c02c1d0842b1291 
  
samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
 0cbdec8ac050de18c2fea191e3ef38273f1dbab1 
  
samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
 19ab78e891ca22b6fba430ded6b9382c860a212d 

Diff: https://reviews.apache.org/r/47247/diff/


Testing
---

Manually tested with a job. Adjusted the input topics to induce a partition 
change and verified no exceptions and the TaskAssignmentManager cleanup ran 
appropriately.

2016-05-11 17:34:33 GroupByContainerCount [WARN] Current task count 32 does not 
match saved task count 512. Stateful jobs may observe misalignment of keys!
...
2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 15" moved from 
container 57 to container null
2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 14" moved from 
container 46 to container null
2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 13" moved from 
container 35 to container null
2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 19" moved from 
container 101 to container null
...


Thanks,

Jake Maes



Re: Review Request 47247: SAMZA-947 - TaskAssignmentManager registration exception when partition count changes.

2016-05-11 Thread Jake Maes


> On May 11, 2016, 8:07 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java,
> >  line 58
> > 
> >
> > There is no place that we change the registered flag. Why do we need to 
> > have it if registered is always false?

This is a bug from when I lost and reapplied the change. 

There's also a file missing from the patch. Let me fix and reupload and we then 
you can take another look. Apologies for the churn.


- Jake


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47247/#review132737
---


On May 11, 2016, 6:19 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47247/
> ---
> 
> (Updated May 11, 2016, 6:19 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-947
> https://issues.apache.org/jira/browse/SAMZA-947
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-947 - TaskAssignmentManager registration exception when partition count 
> changes.
> 
> * Register the producer once at constructor time
> * Don't bother registering the consumer. It's lifecycle is outside the 
> TaskAssignmentManager and registering from the TAM ends up being a no-op.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
>  286ea1b3d0d39ebd7d9923a81c02c1d0842b1291 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
>  0cbdec8ac050de18c2fea191e3ef38273f1dbab1 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
>  19ab78e891ca22b6fba430ded6b9382c860a212d 
> 
> Diff: https://reviews.apache.org/r/47247/diff/
> 
> 
> Testing
> ---
> 
> Manually tested with a job. Adjusted the input topics to induce a partition 
> change and verified no exceptions and the TaskAssignmentManager cleanup ran 
> appropriately.
> 
> 2016-05-11 17:34:33 GroupByContainerCount [WARN] Current task count 32 does 
> not match saved task count 512. Stateful jobs may observe misalignment of 
> keys!
> ...
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 15" moved 
> from container 57 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 14" moved 
> from container 46 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 13" moved 
> from container 35 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 19" moved 
> from container 101 to container null
> ...
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Re: Review Request 47247: SAMZA-947 - TaskAssignmentManager registration exception when partition count changes.

2016-05-11 Thread Yi Pan (Data Infrastructure)

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47247/#review132737
---



It seems that all we needed to fix the issue is to remove the line to register 
the coordinator stream consumer? Did I miss any other things?


samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
 (line 53)


nit: Is this to avoid the call to register() with null input argument? It 
does not seem to be absolutely needed and makes this CoordinatorStreamManager's 
lifecycle different from others.



samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
 (line 58)


There is no place that we change the registered flag. Why do we need to 
have it if registered is always false?


- Yi Pan (Data Infrastructure)


On May 11, 2016, 6:19 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47247/
> ---
> 
> (Updated May 11, 2016, 6:19 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-947
> https://issues.apache.org/jira/browse/SAMZA-947
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-947 - TaskAssignmentManager registration exception when partition count 
> changes.
> 
> * Register the producer once at constructor time
> * Don't bother registering the consumer. It's lifecycle is outside the 
> TaskAssignmentManager and registering from the TAM ends up being a no-op.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
>  286ea1b3d0d39ebd7d9923a81c02c1d0842b1291 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
>  0cbdec8ac050de18c2fea191e3ef38273f1dbab1 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
>  19ab78e891ca22b6fba430ded6b9382c860a212d 
> 
> Diff: https://reviews.apache.org/r/47247/diff/
> 
> 
> Testing
> ---
> 
> Manually tested with a job. Adjusted the input topics to induce a partition 
> change and verified no exceptions and the TaskAssignmentManager cleanup ran 
> appropriately.
> 
> 2016-05-11 17:34:33 GroupByContainerCount [WARN] Current task count 32 does 
> not match saved task count 512. Stateful jobs may observe misalignment of 
> keys!
> ...
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 15" moved 
> from container 57 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 14" moved 
> from container 46 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 13" moved 
> from container 35 to container null
> 2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 19" moved 
> from container 101 to container null
> ...
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Review Request 47247: SAMZA-947 - TaskAssignmentManager registration exception when partition count changes.

2016-05-11 Thread Jake Maes

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47247/
---

Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-947
https://issues.apache.org/jira/browse/SAMZA-947


Repository: samza


Description
---

SAMZA-947 - TaskAssignmentManager registration exception when partition count 
changes.

* Register the producer once at constructor time
* Don't bother registering the consumer. It's lifecycle is outside the 
TaskAssignmentManager and registering from the TAM ends up being a no-op.


Diffs
-

  
samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
 286ea1b3d0d39ebd7d9923a81c02c1d0842b1291 
  
samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
 0cbdec8ac050de18c2fea191e3ef38273f1dbab1 
  
samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
 19ab78e891ca22b6fba430ded6b9382c860a212d 

Diff: https://reviews.apache.org/r/47247/diff/


Testing
---

Manually tested with a job. Adjusted the input topics to induce a partition 
change and verified no exceptions and the TaskAssignmentManager cleanup ran 
appropriately.

2016-05-11 17:34:33 GroupByContainerCount [WARN] Current task count 32 does not 
match saved task count 512. Stateful jobs may observe misalignment of keys!
...
2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 15" moved from 
container 57 to container null
2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 14" moved from 
container 46 to container null
2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 13" moved from 
container 35 to container null
2016-05-11 17:34:34 TaskAssignmentManager [INFO] Task "Partition 19" moved from 
container 101 to container null
...


Thanks,

Jake Maes