-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review97059
-----------------------------------------------------------


Overall looks good to me. I have a few documentation/code reorganization 
suggestions. Thanks a lot!


samza-core/src/main/java/org/apache/samza/container/LocalityManager.java (line 
82)
<https://reviews.apache.org/r/37817/#comment152777>

    What's the reason to have both hostname and hostip here? Some javadoc would 
be nice



samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java (line 24)
<https://reviews.apache.org/r/37817/#comment152820>

    nit: would be nice to add javadoc here.



samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java (line 51)
<https://reviews.apache.org/r/37817/#comment152821>

    Just curious: why are we choosing 3.6s as the sleep time here? Is it from 
experience?



samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java (line 99)
<https://reviews.apache.org/r/37817/#comment152822>

    Do we need a default value here as well? What's the right behavior if this 
config variable is not configured? Do we always default to whatever the default 
Java at the AM machine? Sounded a bit of concern for me.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
(line 32)
<https://reviews.apache.org/r/37817/#comment152823>

    nit: javadoc.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
(line 37)
<https://reviews.apache.org/r/37817/#comment152824>

    For better code re-use and readability, I think that it might be worth 
thinking of creating two derived classes, instead of using a boolean flag in 
this class?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
(line 52)
<https://reviews.apache.org/r/37817/#comment152829>

    It would be good to document the overall threading model here. It seems 
that the following are the main threads involved (correct me if I am wrong):
    - The main thread that drive the AM to send out container requests to RM
    - The allocator thread here to assign the allocated containers to pending 
requests
    - The callback handler thread that receives the responses from RM and 
populate the allocated containers collection in containerRequestState
    - And the SamzaTaskManager handler thread that handles container failures 
and re-request the containers from RM
    
    It would be nice to document the above a little bit here s.t. we have a 
clear picture of which data structure will be shared among which threads.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
(line 71)
<https://reviews.apache.org/r/37817/#comment152826>

    Quick question: it seems the code here tries to take the available 
containers at the beginning of the loop and looping through the pending 
requests for allocation. What if the callback handler tries to add to the 
availableContainers at the same time? Is the List<Container> thread safe? I 
think the code here still works due to the producer/consumer model on the same 
structure here, as long as the remove/add of the first element in the List<> 
does not collide w/ each other.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
(line 80)
<https://reviews.apache.org/r/37817/#comment152825>

    Would it be better to wrap this together w/ 
containerRequestState.updateStateAfterAssignment()? If I understand correctly, 
this step is to make sure that the request queue in the containerRequestState 
is in sync w/ the buffered requests in amClient. I think that 
containerRequestState is also updated when a request is send async via 
amClient. It seems to me that they would be better paired up in two API 
functions.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
(line 96)
<https://reviews.apache.org/r/37817/#comment152827>

    At this moment, are we truly running out of containers? Or simply need to 
wait for more containers from the RM? I think that we are not sure here. Would 
it better to log info "Waiting %s more seconds for more containers to be 
allocated"?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
(line 138)
<https://reviews.apache.org/r/37817/#comment152828>

    It seems that we can have a common thread class within the 
ContainerAllocator class and two different classes implementing 
non-host-affinity and host-affinity algorithms here. Worth to think of?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
(line 158)
<https://reviews.apache.org/r/37817/#comment152858>

    Similar comments here as in non-host-affinity allocator



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
(line 301)
<https://reviews.apache.org/r/37817/#comment152876>

    If amClient.addContainerRequest() is in the same sync block w/ 
containerRequestState.updateRequestState() here, I would think that 
amClient.removeContainerRequest() should be in the same sync block w/ 
containerRequestState.updateRequestState in the allocator threads too.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
(line 314)
<https://reviews.apache.org/r/37817/#comment152891>

    Just a thought, maybe directly declaring 
ContainerRequestState.addContainer() to be a synchronized method if it always 
requires global lock on containerRequestState?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java 
(line 34)
<https://reviews.apache.org/r/37817/#comment152879>

    This is defined redundantly in both ContainerAllocator and here. It would 
be good to just keep one.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java 
(line 57)
<https://reviews.apache.org/r/37817/#comment152882>

    I see the opportunity to remove this conditional flag and split this state 
into two derived classes as well. Thoughts?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java 
(line 68)
<https://reviews.apache.org/r/37817/#comment152884>

    nit: it would be good to mention that under which lock the following 
operation is performed as well.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java 
(line 94)
<https://reviews.apache.org/r/37817/#comment152886>

    nit: Under which lock the method is called? It would be better for 
readability to add this information in the javadoc



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java 
(line 112)
<https://reviews.apache.org/r/37817/#comment152889>

    So, here to make sure that the request count and the allocated containers 
count are not changed in the comparison, access of the two variables and update 
of the two variables need to be atomic. Is it guaranteed by the global lock on 
the state?
    
    Or, I guess the worst result from here is that the requestCountOnThisHost 
has not been updated while the container is allocated. Hence, one allocated 
container goes to ANY_HOST and missed the chance of being allocated to the 
preferred host? We may live w/ this if we want to avoid locking here for 
performance. It would be good to make a note here.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java 
(line 136)
<https://reviews.apache.org/r/37817/#comment152892>

    It would good to comment on under which lock this method should be called.



samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala (line 
19)
<https://reviews.apache.org/r/37817/#comment152901>

    Any reason this class is still in scala? If it is new/refactored, would it 
make more sense to change it to java?



samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 (line 173)
<https://reviews.apache.org/r/37817/#comment152902>

    I thought that the purpose of these tests are authorganal to the change in 
host-affinity and would need to be kept here?


- Yi Pan (Data Infrastructure)


On Aug. 26, 2015, 10:14 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Aug. 26, 2015, 10:14 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi 
> Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It 
> uses a threaded model for request and allocation. More comments in the 
> javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that 
> it delegates onContainerAllocated and requestContainers to the thread in 
> ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config 
> "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with 
> FairScheduler that has continuous scheduling enabled. Details on this config 
> can be found at SAMZA-617 
> [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 
> Pending items:
> 1. Adding unit tests for AppMaster, TaskManager, ContainerAllocator
> 2. Update config documentation 
> 3. Update web-site with info on this feature
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
> c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
>  5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 
> 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java 
> PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml 
> ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 
> 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala 
> PRE-CREATION 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
> af42c6a6636953a95f79837fe372e0dbd735df70 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
>  d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
>  03acfe1bbbabf8f54be9f36fdae785476da45135 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
>  060538623e4d67b986bc635518e7fe8ebdde9e24 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
> 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
>  09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
>  7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
>  df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
>  6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn 
> cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>

Reply via email to