We do get the list of nodes and available resources initially at the beginning of the AM.
Look at line 693 in StramAppMasterService.java class https://github.com/apache/incubator-apex-core/blob/devel-3/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java Thanks - Gaurav > On Jan 19, 2016, at 2:19 PM, Isha Arkatkar <[email protected]> wrote: > > Hi Gaurav, > > I think Pramod agreed with approach 2a. > For 2b approach, I think initially we will not have set of nodes > available in cluster. So we need to try greedy approach to get containers > on different nodes to get a mapping. Once we have mapping though, we can > send request on specific nodes. I think 2b can be more applicable for > container re-allocation scenario, if it node locality request is honored by > Yarn. I can test this out. > > Pramod, could you please elaborate on anti-cluster affinity? > > Thanks, > Isha > > On Tue, Jan 19, 2016 at 2:14 PM, Thomas Weise <[email protected]> > wrote: > >> Gaurav, >> >> Does request for specific node work on FairScheduler now? >> >> >> >> On Tue, Jan 19, 2016 at 2:06 PM, Gaurav Gupta <[email protected]> >> wrote: >> >>> I agree with Pramod that we should go with 2b and we are already doing >>> node locality so you can use that feature. >>> Regarding 3, do we need to support relaxed anti_affinity. Anti_affinity >>> will mostly be used where user wants such segregation of operators on >>> different nodes for his/her App. >>> >>> Thanks >>> - Gaurav >>> >>>> On Jan 19, 2016, at 1:57 PM, Pramod Immaneni <[email protected]> >>> wrote: >>>> >>>> Sorry I meant distro agnostic (without the not) in the first sentence. >>>> >>>> On Tue, Jan 19, 2016 at 1:57 PM, Pramod Immaneni < >> [email protected] >>>> >>>> wrote: >>>> >>>>> Isha this sounds great. 2 a. sounds like a good approach that is not >>>>> distro agnostic. How about also supporting a minor variation of it as >> an >>>>> option where it greedily gets the total number of containers and >>> discards >>>>> ones it can't use and repeats the process for the remaining till >>> everything >>>>> has been allocated. Also does it make sense to support anti-cluster >>>>> affinity? >>>>> >>>>> Thanks >>>>> >>>>> On Tue, Jan 19, 2016 at 1:21 PM, Isha Arkatkar <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> We want add support for Anti-affinity in Apex to allow applications >>> to >>>>>> launch specific physical operators on different nodes(APEXCORE-10 >>>>>> <https://issues.apache.org/jira/browse/APEXCORE-10>). Want to >> request >>>>>> your >>>>>> suggestions/ideas for the same! >>>>>> >>>>>> The reasons for using anti-affinity in operators could be: to ensure >>>>>> reliability, for performance reasons (such as application may not >> want >>> 2 >>>>>> i/o intensive operators to land on the same node to improve >>> performance) >>>>>> or >>>>>> for some application specific constraints(for example, 2 partitions >>>>>> cannot >>>>>> be run on the same node since they use same port number). This is the >>>>>> general rationale for adding Anti-affinity support. >>>>>> >>>>>> Since, Yarn does not support anti-affinity yet (YARN-1042 >>>>>> <https://issues.apache.org/jira/browse/YARN-1042>), we need to >>> implement >>>>>> the logic in AM. Wanted to get your views on following aspects for >> this >>>>>> implementation: >>>>>> >>>>>> *1. How to specify anti-affinity for physical operators/partitions in >>>>>> application:* >>>>>> One way for this is to have an attribute for setting anti-affinity >>> at >>>>>> the logical operator context. And an operator can set this attribute >>> with >>>>>> list of operator names which should not be collocated. >>>>>> Consider dag with 3 operators: >>>>>> TestOperator o1 = dag.addOperator("O1", new TestOperator()); >>>>>> TestOperator o2 = dag.addOperator("O2", new TestOperator()); >>>>>> TestOperator o3 = dag.addOperator("O3", new TestOperator()); >>>>>> >>>>>> To set anti-affinity for O1 operator: >>>>>> dag.setAttribute(o1, OperatorContext.ANTI_AFFINITY, new >>>>>> ArrayList<String>(Arrays.asList("O2", "O3"))); >>>>>> This would mean O1 should not be allocated on nodes containing >>>>>> operators O2 and O3. This applies to all allocated partitions of O1, >>> O2, >>>>>> O3. >>>>>> >>>>>> Also, if same operator name is part of anti-affinity list, it means >>>>>> partitions of the operator should not be allocated on the same node. >>>>>> example: >>>>>> dag.setAttribute(o2, OperatorContext.ANTI_AFFINITY, new >>>>>> ArrayList<String>(Arrays.asList("O2"))); >>>>>> This indicates anti-affinity between all partitions of O2. i.e. >> all >>>>>> partitions of O2 should be launched on different nodes. >>>>>> >>>>>> Based on the anti-affinity attribute specified for logical >> operator, >>>>>> during physical plan creation, we can add this list to each >>> PTContainer. >>>>>> This in turn will be available for Stram for sending container >> requests >>>>>> accordingly. >>>>>> >>>>>> Please suggest if there is a better way to express this intent. >>>>>> >>>>>> *2. How to implement anti-affinity in AM* >>>>>> There are 2 ways we can implement this: >>>>>> * a. Blacklisting of nodes: *We can group the physical container >>>>>> requests >>>>>> based on anti-affinity requirements and send allocation requests for >>>>>> containers in groups. After first group is done, blacklist the nodes >>>>>> before >>>>>> sending second group of container requests. This will ensure that the >>>>>> containers with anti-affinity requirements will be allocated on >>> different >>>>>> nodes. >>>>>> * b. Node specific container request: *Explore and create a map of >>> nodes >>>>>> present in the cluster and send allocation request for container on a >>>>>> specific node, honoring anti-affinity. There are couple of open Yarn >>> Jiras >>>>>> for node specific container requests: YARN-1412 >>>>>> <https://issues.apache.org/jira/browse/YARN-1412>, YARN-2027 >>>>>> <https://issues.apache.org/jira/browse/YARN-2027>. So, need to check >>> if >>>>>> this is a plausible approach. >>>>>> >>>>>> *3. Strict Vs Relaxed anti-affinity* >>>>>> Depending on cluster resources availability, it may not be possible >> to >>>>>> honor all anti-affinity requirements specified. >>>>>> *Strict Anti-affinity:* AM will keep trying to allocate containers as >>> per >>>>>> anti-affinity requirements indefinitely. This behavior will be >> similar >>> to >>>>>> how an application shows in ACCEPTED state, till resources are >>> available >>>>>> to >>>>>> launch in cluster. >>>>>> *Relaxed Anti-affinity:* AM will drop the anti-affinity constraint >>> after a >>>>>> certain timeout. >>>>>> >>>>>> We need a way to set this attribute through application. (Either in >>>>>> operator context or in DAGContext for application wide setting.) >>>>>> >>>>>> *4. How do we unit test this feature* >>>>>> We could use Mockito for mocking Yarn behaviors and test only AM >>>>>> implementation, since it may not be easy to simulate some scenarios >>>>>> manually in cluster. Please suggest if there are better ways to test >>> this. >>>>>> >>>>>> Please suggest improvements or any other ideas on all of the above. >>>>>> >>>>>> Thanks! >>>>>> Isha >>>>>> >>>>>> P.S. Sorry for long email. Please let me know if I should start >>> separate >>>>>> threads for any of the above points. >>>>>> >>>>> >>>>> >>> >>> >>
