http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java index 0e339d8..b3b37e5 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,9 +33,9 @@ import java.io.IOException; * required methods. */ public class BaseInterceptor implements YarnSchedulerInterceptor { - // restrict the constructor - protected BaseInterceptor() { - } + // restrict the constructor + protected BaseInterceptor() { + } @Override public CallBackFilter getCallBackFilter() { @@ -47,22 +47,22 @@ public class BaseInterceptor implements YarnSchedulerInterceptor { }; } - @Override - public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException { - } + @Override + public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException { + } - @Override - public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) { + @Override + public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) { - } + } - @Override - public void beforeSchedulerEventHandled(SchedulerEvent event) { + @Override + public void beforeSchedulerEventHandled(SchedulerEvent event) { - } + } - @Override - public void afterSchedulerEventHandled(SchedulerEvent event) { + @Override + public void afterSchedulerEventHandled(SchedulerEvent event) { - } + } }
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java index 5b7e190..6ae8b7e 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -40,101 +40,100 @@ import java.util.Map; /** * An interceptor that wraps other interceptors. The Myriad{Fair,Capacity,Fifo}Scheduler classes * instantiate this class and allow interception of the Yarn scheduler events/method calls. - * + * <p/> * The {@link CompositeInterceptor} allows other interceptors to be registered via {@link InterceptorRegistry} * and passes control to the registered interceptors whenever a event/method call is being intercepted. - * */ public class CompositeInterceptor implements YarnSchedulerInterceptor, InterceptorRegistry { - private static final Logger LOGGER = LoggerFactory.getLogger(CompositeInterceptor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(CompositeInterceptor.class); - private Map<Class<?>, YarnSchedulerInterceptor> interceptors = Maps.newLinkedHashMap(); - private YarnSchedulerInterceptor myriadInitInterceptor; + private Map<Class<?>, YarnSchedulerInterceptor> interceptors = Maps.newLinkedHashMap(); + private YarnSchedulerInterceptor myriadInitInterceptor; - /** - * Called by Myriad{Fair,Capacity,Fifo}Scheduler classes. Creates an instance of - * {@link MyriadInitializationInterceptor}. - */ - public CompositeInterceptor() { - this.myriadInitInterceptor = new MyriadInitializationInterceptor(this); - } + /** + * Called by Myriad{Fair,Capacity,Fifo}Scheduler classes. Creates an instance of + * {@link MyriadInitializationInterceptor}. + */ + public CompositeInterceptor() { + this.myriadInitInterceptor = new MyriadInitializationInterceptor(this); + } - @VisibleForTesting - public void setMyriadInitInterceptor(YarnSchedulerInterceptor myriadInitInterceptor) { - this.myriadInitInterceptor = myriadInitInterceptor; - } + @VisibleForTesting + public void setMyriadInitInterceptor(YarnSchedulerInterceptor myriadInitInterceptor) { + this.myriadInitInterceptor = myriadInitInterceptor; + } - @Override - public void register(YarnSchedulerInterceptor interceptor) { - interceptors.put(interceptor.getClass(), interceptor); - LOGGER.info("Registered {} into the registry.", interceptor.getClass().getName()); - } + @Override + public void register(YarnSchedulerInterceptor interceptor) { + interceptors.put(interceptor.getClass(), interceptor); + LOGGER.info("Registered {} into the registry.", interceptor.getClass().getName()); + } - @Override - public CallBackFilter getCallBackFilter() { - return new CallBackFilter() { - @Override - public boolean allowCallBacksForNode(NodeId nodeManager) { - return true; - } - }; - } + @Override + public CallBackFilter getCallBackFilter() { + return new CallBackFilter() { + @Override + public boolean allowCallBacksForNode(NodeId nodeManager) { + return true; + } + }; + } /** - * Allows myriad to be initialized via {@link #myriadInitInterceptor}. After myriad is initialized, - * other interceptors will later register with this class via - * {@link InterceptorRegistry#register(YarnSchedulerInterceptor)}. - * - * @param conf - * @param yarnScheduler - * @param rmContext - * @throws IOException - */ - @Override - public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException { - myriadInitInterceptor.init(conf, yarnScheduler, rmContext); - } + * Allows myriad to be initialized via {@link #myriadInitInterceptor}. After myriad is initialized, + * other interceptors will later register with this class via + * {@link InterceptorRegistry#register(YarnSchedulerInterceptor)}. + * + * @param conf + * @param yarnScheduler + * @param rmContext + * @throws IOException + */ + @Override + public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException { + myriadInitInterceptor.init(conf, yarnScheduler, rmContext); + } - @Override - public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) { - for (YarnSchedulerInterceptor interceptor : interceptors.values()) { - if (interceptor.getCallBackFilter().allowCallBacksForNode(event.getNodeId())) { - interceptor.beforeRMNodeEventHandled(event, context); - } - } + @Override + public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) { + for (YarnSchedulerInterceptor interceptor : interceptors.values()) { + if (interceptor.getCallBackFilter().allowCallBacksForNode(event.getNodeId())) { + interceptor.beforeRMNodeEventHandled(event, context); + } } + } - @Override - public void beforeSchedulerEventHandled(SchedulerEvent event) { - for (YarnSchedulerInterceptor interceptor : interceptors.values()) { - final NodeId nodeId = getNodeIdForSchedulerEvent(event); - if (nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) { - interceptor.beforeSchedulerEventHandled(event); - } - } + @Override + public void beforeSchedulerEventHandled(SchedulerEvent event) { + for (YarnSchedulerInterceptor interceptor : interceptors.values()) { + final NodeId nodeId = getNodeIdForSchedulerEvent(event); + if (nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) { + interceptor.beforeSchedulerEventHandled(event); + } } + } - @Override - public void afterSchedulerEventHandled(SchedulerEvent event) { - for (YarnSchedulerInterceptor interceptor : interceptors.values()) { - NodeId nodeId = getNodeIdForSchedulerEvent(event); - if (nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) { - interceptor.afterSchedulerEventHandled(event); - } - } + @Override + public void afterSchedulerEventHandled(SchedulerEvent event) { + for (YarnSchedulerInterceptor interceptor : interceptors.values()) { + NodeId nodeId = getNodeIdForSchedulerEvent(event); + if (nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) { + interceptor.afterSchedulerEventHandled(event); + } } + } private NodeId getNodeIdForSchedulerEvent(SchedulerEvent event) { - switch (event.getType()) { - case NODE_ADDED: - return ((NodeAddedSchedulerEvent) event).getAddedRMNode().getNodeID(); - case NODE_REMOVED: - return ((NodeRemovedSchedulerEvent) event).getRemovedRMNode().getNodeID(); - case NODE_UPDATE: - return ((NodeUpdateSchedulerEvent) event).getRMNode().getNodeID(); - case NODE_RESOURCE_UPDATE: - return ((NodeResourceUpdateSchedulerEvent) event).getRMNode().getNodeID(); - } - return null; + switch (event.getType()) { + case NODE_ADDED: + return ((NodeAddedSchedulerEvent) event).getAddedRMNode().getNodeID(); + case NODE_REMOVED: + return ((NodeRemovedSchedulerEvent) event).getRemovedRMNode().getNodeID(); + case NODE_UPDATE: + return ((NodeUpdateSchedulerEvent) event).getRMNode().getNodeID(); + case NODE_RESOURCE_UPDATE: + return ((NodeResourceUpdateSchedulerEvent) event).getRMNode().getNodeID(); } + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java index 433b6cd..3a504b1 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,6 +23,6 @@ package com.ebay.myriad.scheduler.yarn.interceptor; */ public interface InterceptorRegistry { - public void register(YarnSchedulerInterceptor interceptor); + public void register(YarnSchedulerInterceptor interceptor); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java index d28259d..71580b3 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,27 +32,27 @@ import java.io.IOException; * Responsible for intializing myriad. */ public class MyriadInitializationInterceptor extends BaseInterceptor { - private static final Logger LOGGER = LoggerFactory.getLogger(MyriadInitializationInterceptor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MyriadInitializationInterceptor.class); - private final InterceptorRegistry registry; + private final InterceptorRegistry registry; - public MyriadInitializationInterceptor(InterceptorRegistry registry) { - this.registry = registry; - } + public MyriadInitializationInterceptor(InterceptorRegistry registry) { + this.registry = registry; + } - /** - * Initialize Myriad plugin before RM's scheduler is initialized. - * This includes registration with Mesos master, initialization of - * the myriad web application, initializing guice modules etc. - */ - @Override - public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException { - try { - Main.initialize(conf, yarnScheduler, rmContext, registry); - } catch (Exception e) { - // Abort bringing up RM - throw new RuntimeException("Failed to initialize myriad", e); - } - LOGGER.info("Initialized myriad."); + /** + * Initialize Myriad plugin before RM's scheduler is initialized. + * This includes registration with Mesos master, initialization of + * the myriad web application, initializing guice modules etc. + */ + @Override + public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException { + try { + Main.initialize(conf, yarnScheduler, rmContext, registry); + } catch (Exception e) { + // Abort bringing up RM + throw new RuntimeException("Failed to initialize myriad", e); } + LOGGER.info("Initialized myriad."); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java index 0afcb7f..e28d052 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,62 +34,62 @@ import java.io.IOException; */ public interface YarnSchedulerInterceptor { + /** + * Filters the method callbacks. + */ + interface CallBackFilter { /** - * Filters the method callbacks. - */ - interface CallBackFilter { - /** - * Method to determine if any other methods in {@link YarnSchedulerInterceptor} - * pertaining to a given node manager should be invoked or not. - * - * @param nodeManager NodeId of the Node Manager registered with RM. - * @return true to allow invoking further interceptor methods. false otherwise. - */ - public boolean allowCallBacksForNode(NodeId nodeManager); - } - - /** - * Return an instance of {@link CallBackFilter}. {@link CallBackFilter#allowCallBacksForNode(NodeId)} - * method is invoked to *determine* if any of the other methods pertaining to a specific node - * needs to be invoked or not. + * Method to determine if any other methods in {@link YarnSchedulerInterceptor} + * pertaining to a given node manager should be invoked or not. * - * @return + * @param nodeManager NodeId of the Node Manager registered with RM. + * @return true to allow invoking further interceptor methods. false otherwise. */ - public CallBackFilter getCallBackFilter(); + public boolean allowCallBacksForNode(NodeId nodeManager); + } - /** - * Invoked *before* {@link AbstractYarnScheduler#reinitialize(Configuration, RMContext)} - * - * @param conf - * @param yarnScheduler - * @param rmContext - * @throws IOException - */ - public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException; + /** + * Return an instance of {@link CallBackFilter}. {@link CallBackFilter#allowCallBacksForNode(NodeId)} + * method is invoked to *determine* if any of the other methods pertaining to a specific node + * needs to be invoked or not. + * + * @return + */ + public CallBackFilter getCallBackFilter(); - /** - * Invoked *before* {@link RMNodeImpl#handle(RMNodeEvent)} only if - * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true. - * - * @param event - * @param context - */ - public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context); + /** + * Invoked *before* {@link AbstractYarnScheduler#reinitialize(Configuration, RMContext)} + * + * @param conf + * @param yarnScheduler + * @param rmContext + * @throws IOException + */ + public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException; - /** - * Invoked *before* {@link YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if - * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true. - * - * @param event - */ - public void beforeSchedulerEventHandled(SchedulerEvent event); + /** + * Invoked *before* {@link RMNodeImpl#handle(RMNodeEvent)} only if + * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true. + * + * @param event + * @param context + */ + public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context); - /** - * Invoked *after* {@link YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if - * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true. - * - * @param event - */ - public void afterSchedulerEventHandled(SchedulerEvent event); + /** + * Invoked *before* {@link YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if + * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true. + * + * @param event + */ + public void beforeSchedulerEventHandled(SchedulerEvent event); + + /** + * Invoked *after* {@link YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if + * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true. + * + * @param event + */ + public void afterSchedulerEventHandled(SchedulerEvent event); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java index 1f2fe41..687b491 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,72 +28,72 @@ import java.util.UUID; * Model which represents the configuration of a cluster */ public class Cluster { - private String clusterId; - private String clusterName; - private Collection<NodeTask> nodes; - private String resourceManagerHost; - private String resourceManagerPort; - private double minQuota; - - public Cluster() { - this.clusterId = UUID.randomUUID().toString(); - this.nodes = new HashSet<>(); - } - - public String getClusterId() { - return clusterId; - } - - public String getClusterName() { - return clusterName; - } - - public void setClusterName(String clusterName) { - this.clusterName = clusterName; - } - - public Collection<NodeTask> getNodes() { - return nodes; - } - - public void addNode(NodeTask node) { - this.nodes.add(node); - } - - public void addNodes(Collection<NodeTask> nodes) { - this.nodes.addAll(nodes); - } - - public void removeNode(NodeTask task) { - this.nodes.remove(task); - } - - public String getResourceManagerHost() { - return resourceManagerHost; - } - - public void setResourceManagerHost(String resourceManagerHost) { - this.resourceManagerHost = resourceManagerHost; - } - - public String getResourceManagerPort() { - return resourceManagerPort; - } - - public void setResourceManagerPort(String resourceManagerPort) { - this.resourceManagerPort = resourceManagerPort; - } - - public double getMinQuota() { - return minQuota; - } - - public void setMinQuota(double minQuota) { - this.minQuota = minQuota; - } - - public String toString() { - Gson gson = new Gson(); - return gson.toJson(this); - } + private String clusterId; + private String clusterName; + private Collection<NodeTask> nodes; + private String resourceManagerHost; + private String resourceManagerPort; + private double minQuota; + + public Cluster() { + this.clusterId = UUID.randomUUID().toString(); + this.nodes = new HashSet<>(); + } + + public String getClusterId() { + return clusterId; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public Collection<NodeTask> getNodes() { + return nodes; + } + + public void addNode(NodeTask node) { + this.nodes.add(node); + } + + public void addNodes(Collection<NodeTask> nodes) { + this.nodes.addAll(nodes); + } + + public void removeNode(NodeTask task) { + this.nodes.remove(task); + } + + public String getResourceManagerHost() { + return resourceManagerHost; + } + + public void setResourceManagerHost(String resourceManagerHost) { + this.resourceManagerHost = resourceManagerHost; + } + + public String getResourceManagerPort() { + return resourceManagerPort; + } + + public void setResourceManagerPort(String resourceManagerPort) { + this.resourceManagerPort = resourceManagerPort; + } + + public double getMinQuota() { + return minQuota; + } + + public void setMinQuota(double minQuota) { + this.minQuota = minQuota; + } + + public String toString() { + Gson gson = new Gson(); + return gson.toJson(this); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java index 550c8cc..5e868f9 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,27 +30,27 @@ import java.util.concurrent.ExecutionException; * Model that represents the state of Myriad */ public class MyriadState { - public static final String KEY_FRAMEWORK_ID = "frameworkId"; + public static final String KEY_FRAMEWORK_ID = "frameworkId"; - private State stateStore; + private State stateStore; - public MyriadState(State stateStore) { - this.stateStore = stateStore; - } + public MyriadState(State stateStore) { + this.stateStore = stateStore; + } - public Protos.FrameworkID getFrameworkID() throws InterruptedException, ExecutionException, InvalidProtocolBufferException { - byte[] frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get().value(); + public Protos.FrameworkID getFrameworkID() throws InterruptedException, ExecutionException, InvalidProtocolBufferException { + byte[] frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get().value(); - if (frameworkId.length > 0) { - return Protos.FrameworkID.parseFrom(frameworkId); - } else { - return null; - } + if (frameworkId.length > 0) { + return Protos.FrameworkID.parseFrom(frameworkId); + } else { + return null; } + } - public void setFrameworkId(Protos.FrameworkID newFrameworkId) throws InterruptedException, ExecutionException { - Variable frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get(); - frameworkId = frameworkId.mutate(newFrameworkId.toByteArray()); - stateStore.store(frameworkId).get(); - } + public void setFrameworkId(Protos.FrameworkID newFrameworkId) throws InterruptedException, ExecutionException { + Variable frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get(); + frameworkId = frameworkId.mutate(newFrameworkId.toByteArray()); + stateStore.store(frameworkId).get(); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java index d02fab9..99ab327 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,7 +22,7 @@ package com.ebay.myriad.state; import com.ebay.myriad.state.utils.StoreContext; /** - * Interface implemented by all Myriad State Store implementations + * Interface implemented by all Myriad State Store implementations */ public interface MyriadStateStore { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java index 354c575..d784092 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,90 +32,90 @@ import org.apache.mesos.Protos.Attribute; * Represents a task to be launched by the executor */ public class NodeTask { - @JsonProperty - private String hostname; - @JsonProperty - private Protos.SlaveID slaveId; - @JsonProperty - private Protos.TaskStatus taskStatus; - @JsonProperty - private String taskPrefix; - @JsonProperty - private ServiceResourceProfile serviceresourceProfile; - - @Inject - TaskUtils taskUtils; - /** - * Mesos executor for this node. - */ - private Protos.ExecutorInfo executorInfo; - - private Constraint constraint; - private List<Attribute> slaveAttributes; - - public NodeTask(ServiceResourceProfile profile, Constraint constraint) { - this.serviceresourceProfile = profile; - this.hostname = ""; - this.constraint = constraint; - } - - public Protos.SlaveID getSlaveId() { - return slaveId; - } - - public void setSlaveId(Protos.SlaveID slaveId) { - this.slaveId = slaveId; - } - - public Constraint getConstraint() { - return constraint; - } - - public String getHostname() { - return this.hostname; - } - - public void setHostname(String hostname) { - this.hostname = hostname; - } - - public Protos.TaskStatus getTaskStatus() { - return taskStatus; - } - - public void setTaskStatus(Protos.TaskStatus taskStatus) { - this.taskStatus = taskStatus; - } - - public Protos.ExecutorInfo getExecutorInfo() { - return executorInfo; - } - - public void setExecutorInfo(Protos.ExecutorInfo executorInfo) { - this.executorInfo = executorInfo; - } - - public void setSlaveAttributes(List<Attribute> slaveAttributes) { - this.slaveAttributes = slaveAttributes; - } - - public List<Attribute> getSlaveAttributes() { - return slaveAttributes; - } - - public String getTaskPrefix() { - return taskPrefix; - } - - public void setTaskPrefix(String taskPrefix) { - this.taskPrefix = taskPrefix; - } - - public ServiceResourceProfile getProfile() { - return serviceresourceProfile; - } - - public void setProfile(ServiceResourceProfile serviceresourceProfile) { - this.serviceresourceProfile = serviceresourceProfile; - } + @JsonProperty + private String hostname; + @JsonProperty + private Protos.SlaveID slaveId; + @JsonProperty + private Protos.TaskStatus taskStatus; + @JsonProperty + private String taskPrefix; + @JsonProperty + private ServiceResourceProfile serviceresourceProfile; + + @Inject + TaskUtils taskUtils; + /** + * Mesos executor for this node. + */ + private Protos.ExecutorInfo executorInfo; + + private Constraint constraint; + private List<Attribute> slaveAttributes; + + public NodeTask(ServiceResourceProfile profile, Constraint constraint) { + this.serviceresourceProfile = profile; + this.hostname = ""; + this.constraint = constraint; + } + + public Protos.SlaveID getSlaveId() { + return slaveId; + } + + public void setSlaveId(Protos.SlaveID slaveId) { + this.slaveId = slaveId; + } + + public Constraint getConstraint() { + return constraint; + } + + public String getHostname() { + return this.hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public Protos.TaskStatus getTaskStatus() { + return taskStatus; + } + + public void setTaskStatus(Protos.TaskStatus taskStatus) { + this.taskStatus = taskStatus; + } + + public Protos.ExecutorInfo getExecutorInfo() { + return executorInfo; + } + + public void setExecutorInfo(Protos.ExecutorInfo executorInfo) { + this.executorInfo = executorInfo; + } + + public void setSlaveAttributes(List<Attribute> slaveAttributes) { + this.slaveAttributes = slaveAttributes; + } + + public List<Attribute> getSlaveAttributes() { + return slaveAttributes; + } + + public String getTaskPrefix() { + return taskPrefix; + } + + public void setTaskPrefix(String taskPrefix) { + this.taskPrefix = taskPrefix; + } + + public ServiceResourceProfile getProfile() { + return serviceresourceProfile; + } + + public void setProfile(ServiceResourceProfile serviceresourceProfile) { + this.serviceresourceProfile = serviceresourceProfile; + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java index 08a4dfa..99a7506 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -44,411 +44,398 @@ import org.slf4j.LoggerFactory; * Represents the state of the Myriad scheduler */ public class SchedulerState { - private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerState.class); - - private static Pattern taskIdPattern = Pattern.compile("\\."); - - private Map<Protos.TaskID, NodeTask> tasks; - private Protos.FrameworkID frameworkId; - private MyriadStateStore stateStore; - private Map<String, SchedulerStateForType> statesForTaskType; - - public SchedulerState(MyriadStateStore stateStore) { - this.tasks = new ConcurrentHashMap<>(); - this.stateStore = stateStore; - this.statesForTaskType = new ConcurrentHashMap<>(); - loadStateStore(); - } - - /** - * Making method synchronized, so if someone tries flexup/down at the same time - * addNodes and removeTask will not put data into an inconsistent state - * @param nodes - */ - public synchronized void addNodes(Collection<NodeTask> nodes) { - if (CollectionUtils.isEmpty(nodes)) { - LOGGER.info("No nodes to add"); - return; - } - for (NodeTask node : nodes) { - Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(String.format("%s.%s.%s", node.getTaskPrefix(), node.getProfile().getName(), UUID.randomUUID())) - .build(); - addTask(taskId, node); - SchedulerStateForType taskState = this.statesForTaskType.get(node.getTaskPrefix()); - LOGGER.info("Marked taskId {} pending, size of pending queue for {} is: {}", taskId.getValue(), node.getTaskPrefix(), - (taskState == null ? 0 : taskState.getPendingTaskIds().size())); - makeTaskPending(taskId); - } + private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerState.class); - } + private static Pattern taskIdPattern = Pattern.compile("\\."); - // TODO (sdaingade) Clone NodeTask - public synchronized void addTask(Protos.TaskID taskId, NodeTask node) { - this.tasks.put(taskId, node); - updateStateStore(); - } + private Map<Protos.TaskID, NodeTask> tasks; + private Protos.FrameworkID frameworkId; + private MyriadStateStore stateStore; + private Map<String, SchedulerStateForType> statesForTaskType; - public synchronized void updateTask(Protos.TaskStatus taskStatus) { - Objects.requireNonNull(taskStatus, "TaskStatus object shouldn't be null"); - Protos.TaskID taskId = taskStatus.getTaskId(); - if (this.tasks.containsKey(taskId)) { - this.tasks.get(taskId).setTaskStatus(taskStatus); - } - updateStateStore(); - } + public SchedulerState(MyriadStateStore stateStore) { + this.tasks = new ConcurrentHashMap<>(); + this.stateStore = stateStore; + this.statesForTaskType = new ConcurrentHashMap<>(); + loadStateStore(); + } - public synchronized void makeTaskPending(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, - "taskId cannot be empty or null"); - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState == null) { - taskTypeState = new SchedulerStateForType(taskPrefix); - statesForTaskType.put(taskPrefix, taskTypeState); - } - taskTypeState.makeTaskPending(taskId); - updateStateStore(); + /** + * Making method synchronized, so if someone tries flexup/down at the same time + * addNodes and removeTask will not put data into an inconsistent state + * + * @param nodes + */ + public synchronized void addNodes(Collection<NodeTask> nodes) { + if (CollectionUtils.isEmpty(nodes)) { + LOGGER.info("No nodes to add"); + return; + } + for (NodeTask node : nodes) { + Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(String.format("%s.%s.%s", node.getTaskPrefix(), node.getProfile().getName(), UUID.randomUUID())).build(); + addTask(taskId, node); + SchedulerStateForType taskState = this.statesForTaskType.get(node.getTaskPrefix()); + LOGGER.info("Marked taskId {} pending, size of pending queue for {} is: {}", taskId.getValue(), node.getTaskPrefix(), (taskState == null ? 0 : taskState.getPendingTaskIds().size())); + makeTaskPending(taskId); } - public synchronized void makeTaskStaging(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, - "taskId cannot be empty or null"); - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState == null) { - taskTypeState = new SchedulerStateForType(taskPrefix); - statesForTaskType.put(taskPrefix, taskTypeState); - } - taskTypeState.makeTaskStaging(taskId); - updateStateStore(); - } + } - public synchronized void makeTaskActive(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, - "taskId cannot be empty or null"); - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState == null) { - taskTypeState = new SchedulerStateForType(taskPrefix); - statesForTaskType.put(taskPrefix, taskTypeState); - } - taskTypeState.makeTaskActive(taskId); - updateStateStore(); - } + // TODO (sdaingade) Clone NodeTask + public synchronized void addTask(Protos.TaskID taskId, NodeTask node) { + this.tasks.put(taskId, node); + updateStateStore(); + } - public synchronized void makeTaskLost(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, - "taskId cannot be empty or null"); - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState == null) { - taskTypeState = new SchedulerStateForType(taskPrefix); - statesForTaskType.put(taskPrefix, taskTypeState); - } - taskTypeState.makeTaskLost(taskId); - updateStateStore(); + public synchronized void updateTask(Protos.TaskStatus taskStatus) { + Objects.requireNonNull(taskStatus, "TaskStatus object shouldn't be null"); + Protos.TaskID taskId = taskStatus.getTaskId(); + if (this.tasks.containsKey(taskId)) { + this.tasks.get(taskId).setTaskStatus(taskStatus); } + updateStateStore(); + } - public synchronized void makeTaskKillable(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, - "taskId cannot be empty or null"); - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState == null) { - taskTypeState = new SchedulerStateForType(taskPrefix); - statesForTaskType.put(taskPrefix, taskTypeState); - } - taskTypeState.makeTaskKillable(taskId); - updateStateStore(); + public synchronized void makeTaskPending(Protos.TaskID taskId) { + Objects.requireNonNull(taskId, "taskId cannot be empty or null"); + String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; + SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); + if (taskTypeState == null) { + taskTypeState = new SchedulerStateForType(taskPrefix); + statesForTaskType.put(taskPrefix, taskTypeState); } + taskTypeState.makeTaskPending(taskId); + updateStateStore(); + } - // TODO (sdaingade) Clone NodeTask - public synchronized NodeTask getTask(Protos.TaskID taskId) { - return this.tasks.get(taskId); + public synchronized void makeTaskStaging(Protos.TaskID taskId) { + Objects.requireNonNull(taskId, "taskId cannot be empty or null"); + String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; + SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); + if (taskTypeState == null) { + taskTypeState = new SchedulerStateForType(taskPrefix); + statesForTaskType.put(taskPrefix, taskTypeState); } + taskTypeState.makeTaskStaging(taskId); + updateStateStore(); + } - public synchronized Set<Protos.TaskID> getKillableTasks() { - Set<Protos.TaskID> returnSet = new HashSet<>(); - for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { - returnSet.addAll(entry.getValue().getKillableTasks()); - } - return returnSet; + public synchronized void makeTaskActive(Protos.TaskID taskId) { + Objects.requireNonNull(taskId, "taskId cannot be empty or null"); + String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; + SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); + if (taskTypeState == null) { + taskTypeState = new SchedulerStateForType(taskPrefix); + statesForTaskType.put(taskPrefix, taskTypeState); } + taskTypeState.makeTaskActive(taskId); + updateStateStore(); + } - public synchronized Set<Protos.TaskID> getKillableTasks(String taskPrefix) { - SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); - return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getKillableTasks()); + public synchronized void makeTaskLost(Protos.TaskID taskId) { + Objects.requireNonNull(taskId, "taskId cannot be empty or null"); + String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; + SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); + if (taskTypeState == null) { + taskTypeState = new SchedulerStateForType(taskPrefix); + statesForTaskType.put(taskPrefix, taskTypeState); } - - public synchronized void removeTask(Protos.TaskID taskId) { - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState != null) { - taskTypeState.removeTask(taskId); - } - this.tasks.remove(taskId); - updateStateStore(); + taskTypeState.makeTaskLost(taskId); + updateStateStore(); + } + + public synchronized void makeTaskKillable(Protos.TaskID taskId) { + Objects.requireNonNull(taskId, "taskId cannot be empty or null"); + String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; + SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); + if (taskTypeState == null) { + taskTypeState = new SchedulerStateForType(taskPrefix); + statesForTaskType.put(taskPrefix, taskTypeState); } + taskTypeState.makeTaskKillable(taskId); + updateStateStore(); + } - public synchronized Set<Protos.TaskID> getPendingTaskIds() { - Set<Protos.TaskID> returnSet = new HashSet<>(); - for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { - returnSet.addAll(entry.getValue().getPendingTaskIds()); - } - return returnSet; + // TODO (sdaingade) Clone NodeTask + public synchronized NodeTask getTask(Protos.TaskID taskId) { + return this.tasks.get(taskId); + } + + public synchronized Set<Protos.TaskID> getKillableTasks() { + Set<Protos.TaskID> returnSet = new HashSet<>(); + for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { + returnSet.addAll(entry.getValue().getKillableTasks()); } + return returnSet; + } - public synchronized Collection<Protos.TaskID> getPendingTaskIDsForProfile(ServiceResourceProfile serviceProfile) { - List<Protos.TaskID> pendingTaskIds = new ArrayList<>(); - Set<Protos.TaskID> pendingTasks = getPendingTaskIds(); - for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { - NodeTask nodeTask = entry.getValue(); - if (pendingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) { - pendingTaskIds.add(entry.getKey()); - } - } - return Collections.unmodifiableCollection(pendingTaskIds); + public synchronized Set<Protos.TaskID> getKillableTasks(String taskPrefix) { + SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); + return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getKillableTasks()); + } + + public synchronized void removeTask(Protos.TaskID taskId) { + String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; + SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); + if (taskTypeState != null) { + taskTypeState.removeTask(taskId); } + this.tasks.remove(taskId); + updateStateStore(); + } - public synchronized Set<Protos.TaskID> getPendingTaskIds(String taskPrefix) { - SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); - return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getPendingTaskIds()); + public synchronized Set<Protos.TaskID> getPendingTaskIds() { + Set<Protos.TaskID> returnSet = new HashSet<>(); + for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { + returnSet.addAll(entry.getValue().getPendingTaskIds()); } + return returnSet; + } - public synchronized Set<Protos.TaskID> getActiveTaskIds() { - Set<Protos.TaskID> returnSet = new HashSet<>(); - for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { - returnSet.addAll(entry.getValue().getActiveTaskIds()); + public synchronized Collection<Protos.TaskID> getPendingTaskIDsForProfile(ServiceResourceProfile serviceProfile) { + List<Protos.TaskID> pendingTaskIds = new ArrayList<>(); + Set<Protos.TaskID> pendingTasks = getPendingTaskIds(); + for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { + NodeTask nodeTask = entry.getValue(); + if (pendingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) { + pendingTaskIds.add(entry.getKey()); } - return returnSet; } + return Collections.unmodifiableCollection(pendingTaskIds); + } - public synchronized Set<Protos.TaskID> getActiveTaskIds(String taskPrefix) { - SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); - return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getActiveTaskIds()); - } + public synchronized Set<Protos.TaskID> getPendingTaskIds(String taskPrefix) { + SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); + return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getPendingTaskIds()); + } - public synchronized Set<NodeTask> getActiveTasks() { - return getTasks(getActiveTaskIds()); + public synchronized Set<Protos.TaskID> getActiveTaskIds() { + Set<Protos.TaskID> returnSet = new HashSet<>(); + for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { + returnSet.addAll(entry.getValue().getActiveTaskIds()); } + return returnSet; + } - public Set<NodeTask> getActiveTasksByType(String taskPrefix) { - return getTasks(getActiveTaskIds(taskPrefix)); - } + public synchronized Set<Protos.TaskID> getActiveTaskIds(String taskPrefix) { + SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); + return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getActiveTaskIds()); + } - public Set<NodeTask> getStagingTasks() { - return getTasks(getStagingTaskIds()); - } + public synchronized Set<NodeTask> getActiveTasks() { + return getTasks(getActiveTaskIds()); + } - public Set<NodeTask> getStagingTasksByType(String taskPrefix) { - return getTasks(getStagingTaskIds(taskPrefix)); - } + public Set<NodeTask> getActiveTasksByType(String taskPrefix) { + return getTasks(getActiveTaskIds(taskPrefix)); + } - public Set<NodeTask> getPendingTasksByType(String taskPrefix) { - return getTasks(getPendingTaskIds(taskPrefix)); - } + public Set<NodeTask> getStagingTasks() { + return getTasks(getStagingTaskIds()); + } - public synchronized Set<NodeTask> getTasks(Set<Protos.TaskID> taskIds) { - Set<NodeTask> nodeTasks = new HashSet<>(); - if (CollectionUtils.isNotEmpty(taskIds) - && CollectionUtils.isNotEmpty(tasks.values())) { - for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { - if (taskIds.contains(entry.getKey())) { - nodeTasks.add(entry.getValue()); - } - } - } - return Collections.unmodifiableSet(nodeTasks); - } - - public synchronized Collection<Protos.TaskID> getActiveTaskIDsForProfile(ServiceResourceProfile serviceProfile) { - List<Protos.TaskID> activeTaskIDs = new ArrayList<>(); - Set<Protos.TaskID> activeTaskIds = getActiveTaskIds(); - if (CollectionUtils.isNotEmpty(activeTaskIds) - && CollectionUtils.isNotEmpty(tasks.values())) { - for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { - NodeTask nodeTask = entry.getValue(); - if (activeTaskIds.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) { - activeTaskIDs.add(entry.getKey()); - } - } - } - return Collections.unmodifiableCollection(activeTaskIDs); - } + public Set<NodeTask> getStagingTasksByType(String taskPrefix) { + return getTasks(getStagingTaskIds(taskPrefix)); + } - // TODO (sdaingade) Clone NodeTask - public synchronized NodeTask getNodeTask(SlaveID slaveId, String taskPrefix) { - if (taskPrefix == null) { - return null; - } + public Set<NodeTask> getPendingTasksByType(String taskPrefix) { + return getTasks(getPendingTaskIds(taskPrefix)); + } + + public synchronized Set<NodeTask> getTasks(Set<Protos.TaskID> taskIds) { + Set<NodeTask> nodeTasks = new HashSet<>(); + if (CollectionUtils.isNotEmpty(taskIds) && CollectionUtils.isNotEmpty(tasks.values())) { for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { - final NodeTask task = entry.getValue(); - if (task.getSlaveId() != null && - task.getSlaveId().equals(slaveId) && - taskPrefix.equals(task.getTaskPrefix())) { - return entry.getValue(); + if (taskIds.contains(entry.getKey())) { + nodeTasks.add(entry.getValue()); } } - return null; } + return Collections.unmodifiableSet(nodeTasks); + } - public synchronized Set<NodeTask> getNodeTasks(SlaveID slaveId) { - Set<NodeTask> nodeTasks = Sets.newHashSet(); + public synchronized Collection<Protos.TaskID> getActiveTaskIDsForProfile(ServiceResourceProfile serviceProfile) { + List<Protos.TaskID> activeTaskIDs = new ArrayList<>(); + Set<Protos.TaskID> activeTaskIds = getActiveTaskIds(); + if (CollectionUtils.isNotEmpty(activeTaskIds) && CollectionUtils.isNotEmpty(tasks.values())) { for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { - final NodeTask task = entry.getValue(); - if (task.getSlaveId() != null && - task.getSlaveId().equals(slaveId)) { - nodeTasks.add(entry.getValue()); + NodeTask nodeTask = entry.getValue(); + if (activeTaskIds.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) { + activeTaskIDs.add(entry.getKey()); } } - return nodeTasks; } + return Collections.unmodifiableCollection(activeTaskIDs); + } - public Set<Protos.TaskID> getStagingTaskIds() { - Set<Protos.TaskID> returnSet = new HashSet<>(); - for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { - returnSet.addAll(entry.getValue().getStagingTaskIds()); + // TODO (sdaingade) Clone NodeTask + public synchronized NodeTask getNodeTask(SlaveID slaveId, String taskPrefix) { + if (taskPrefix == null) { + return null; + } + for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { + final NodeTask task = entry.getValue(); + if (task.getSlaveId() != null && + task.getSlaveId().equals(slaveId) && + taskPrefix.equals(task.getTaskPrefix())) { + return entry.getValue(); } - return returnSet; } + return null; + } - public synchronized Collection<Protos.TaskID> getStagingTaskIDsForProfile(ServiceResourceProfile serviceProfile) { - List<Protos.TaskID> stagingTaskIDs = new ArrayList<>(); - - Set<Protos.TaskID> stagingTasks = getStagingTaskIds(); - for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { - NodeTask nodeTask = entry.getValue(); - if (stagingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) { - stagingTaskIDs.add(entry.getKey()); - } + public synchronized Set<NodeTask> getNodeTasks(SlaveID slaveId) { + Set<NodeTask> nodeTasks = Sets.newHashSet(); + for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { + final NodeTask task = entry.getValue(); + if (task.getSlaveId() != null && task.getSlaveId().equals(slaveId)) { + nodeTasks.add(entry.getValue()); } - return Collections.unmodifiableCollection(stagingTaskIDs); } + return nodeTasks; + } - public Set<Protos.TaskID> getStagingTaskIds(String taskPrefix) { - SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); - return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getStagingTaskIds()); + public Set<Protos.TaskID> getStagingTaskIds() { + Set<Protos.TaskID> returnSet = new HashSet<>(); + for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { + returnSet.addAll(entry.getValue().getStagingTaskIds()); } - - public Set<Protos.TaskID> getLostTaskIds() { - Set<Protos.TaskID> returnSet = new HashSet<>(); - for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { - returnSet.addAll(entry.getValue().getLostTaskIds()); + return returnSet; + } + + public synchronized Collection<Protos.TaskID> getStagingTaskIDsForProfile(ServiceResourceProfile serviceProfile) { + List<Protos.TaskID> stagingTaskIDs = new ArrayList<>(); + + Set<Protos.TaskID> stagingTasks = getStagingTaskIds(); + for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { + NodeTask nodeTask = entry.getValue(); + if (stagingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) { + stagingTaskIDs.add(entry.getKey()); } - return returnSet; - } - - public Set<Protos.TaskID> getLostTaskIds(String taskPrefix) { - SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); - return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getLostTaskIds()); - } - - // TODO (sdaingade) Currently cannot return unmodifiableCollection - // as this will break ReconcileService code - public synchronized Collection<Protos.TaskStatus> getTaskStatuses() { - Collection<Protos.TaskStatus> taskStatuses = new ArrayList<>(this.tasks.size()); - Collection<NodeTask> tasks = this.tasks.values(); - for (NodeTask task : tasks) { - Protos.TaskStatus taskStatus = task.getTaskStatus(); - if (taskStatus != null) { - taskStatuses.add(taskStatus); - } - } + } + return Collections.unmodifiableCollection(stagingTaskIDs); + } - return taskStatuses; + public Set<Protos.TaskID> getStagingTaskIds(String taskPrefix) { + SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); + return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getStagingTaskIds()); + } + + public Set<Protos.TaskID> getLostTaskIds() { + Set<Protos.TaskID> returnSet = new HashSet<>(); + for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { + returnSet.addAll(entry.getValue().getLostTaskIds()); } + return returnSet; + } + + public Set<Protos.TaskID> getLostTaskIds(String taskPrefix) { + SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); + return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getLostTaskIds()); + } - public synchronized boolean hasTask(Protos.TaskID taskID) { - return this.tasks.containsKey(taskID); + // TODO (sdaingade) Currently cannot return unmodifiableCollection + // as this will break ReconcileService code + public synchronized Collection<Protos.TaskStatus> getTaskStatuses() { + Collection<Protos.TaskStatus> taskStatuses = new ArrayList<>(this.tasks.size()); + Collection<NodeTask> tasks = this.tasks.values(); + for (NodeTask task : tasks) { + Protos.TaskStatus taskStatus = task.getTaskStatus(); + if (taskStatus != null) { + taskStatuses.add(taskStatus); + } } - public synchronized Protos.FrameworkID getFrameworkID() { - return this.frameworkId; + return taskStatuses; + } + + public synchronized boolean hasTask(Protos.TaskID taskID) { + return this.tasks.containsKey(taskID); + } + + public synchronized Protos.FrameworkID getFrameworkID() { + return this.frameworkId; + } + + public synchronized void setFrameworkId(Protos.FrameworkID newFrameworkId) { + this.frameworkId = newFrameworkId; + updateStateStore(); + } + + private synchronized void updateStateStore() { + if (this.stateStore == null) { + LOGGER.debug("Could not update state to state store as HA is disabled"); + return; } - public synchronized void setFrameworkId(Protos.FrameworkID newFrameworkId) { - this.frameworkId = newFrameworkId; - updateStateStore(); + try { + StoreContext sc = new StoreContext(frameworkId, tasks, getPendingTaskIds(), getStagingTaskIds(), getActiveTaskIds(), getLostTaskIds(), getKillableTasks()); + stateStore.storeMyriadState(sc); + } catch (Exception e) { + LOGGER.error("Failed to update scheduler state to state store", e); } + } - private synchronized void updateStateStore() { - if (this.stateStore == null) { - LOGGER.debug("Could not update state to state store as HA is disabled"); - return; - } + private synchronized void loadStateStore() { + if (this.stateStore == null) { + LOGGER.debug("Could not load state from state store as HA is disabled"); + return; + } + + try { + StoreContext sc = stateStore.loadMyriadState(); + if (sc != null) { + this.frameworkId = sc.getFrameworkId(); + this.tasks.putAll(sc.getTasks()); + convertToThis(TaskState.PENDING, sc.getPendingTasks()); + convertToThis(TaskState.STAGING, sc.getStagingTasks()); + convertToThis(TaskState.ACTIVE, sc.getActiveTasks()); + convertToThis(TaskState.LOST, sc.getLostTasks()); + convertToThis(TaskState.KILLABLE, sc.getKillableTasks()); + LOGGER.info("Loaded Myriad state from state store successfully."); + LOGGER.debug("State Store state includes " + + "frameworkId: {}, pending tasks count: {}, staging tasks count: {} " + + "active tasks count: {}, lost tasks count: {}, " + + "and killable tasks count: {}", frameworkId.getValue(), this.getPendingTaskIds().size(), this.getStagingTaskIds().size(), this.getActiveTaskIds().size(), this.getLostTaskIds().size(), this.getKillableTasks().size()); + } + } catch (Exception e) { + LOGGER.error("Failed to read scheduler state from state store", e); + } + } - try { - StoreContext sc = new StoreContext(frameworkId, tasks, getPendingTaskIds(), - getStagingTaskIds(), getActiveTaskIds(), getLostTaskIds(), getKillableTasks()); - stateStore.storeMyriadState(sc); - } catch (Exception e) { - LOGGER.error("Failed to update scheduler state to state store", e); - } + private void convertToThis(TaskState taskType, Set<Protos.TaskID> taskIds) { + for (Protos.TaskID taskId : taskIds) { + String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; + SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); + if (taskTypeState == null) { + taskTypeState = new SchedulerStateForType(taskPrefix); + statesForTaskType.put(taskPrefix, taskTypeState); + } + switch (taskType) { + case PENDING: + taskTypeState.makeTaskPending(taskId); + break; + case STAGING: + taskTypeState.makeTaskStaging(taskId); + break; + case ACTIVE: + taskTypeState.makeTaskActive(taskId); + break; + case KILLABLE: + taskTypeState.makeTaskKillable(taskId); + break; + case LOST: + taskTypeState.makeTaskLost(taskId); + break; + } } + } - private synchronized void loadStateStore() { - if (this.stateStore == null) { - LOGGER.debug("Could not load state from state store as HA is disabled"); - return; - } + /** + * Class to keep all the tasks states for a particular taskPrefix together + */ + private static class SchedulerStateForType { - try { - StoreContext sc = stateStore.loadMyriadState(); - if (sc != null) { - this.frameworkId = sc.getFrameworkId(); - this.tasks.putAll(sc.getTasks()); - convertToThis(TaskState.PENDING, sc.getPendingTasks()); - convertToThis(TaskState.STAGING, sc.getStagingTasks()); - convertToThis(TaskState.ACTIVE, sc.getActiveTasks()); - convertToThis(TaskState.LOST, sc.getLostTasks()); - convertToThis(TaskState.KILLABLE, sc.getKillableTasks()); - LOGGER.info("Loaded Myriad state from state store successfully."); - LOGGER.debug("State Store state includes " + - "frameworkId: {}, pending tasks count: {}, staging tasks count: {} " + - "active tasks count: {}, lost tasks count: {}, " + - "and killable tasks count: {}", frameworkId.getValue(), - this.getPendingTaskIds().size(), this.getStagingTaskIds().size(), - this.getActiveTaskIds().size(), this.getLostTaskIds().size(), - this.getKillableTasks().size()); - } - } catch (Exception e) { - LOGGER.error("Failed to read scheduler state from state store", e); - } - } - - private void convertToThis(TaskState taskType, Set<Protos.TaskID> taskIds) { - for (Protos.TaskID taskId : taskIds) { - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState == null) { - taskTypeState = new SchedulerStateForType(taskPrefix); - statesForTaskType.put(taskPrefix, taskTypeState); - } - switch(taskType) { - case PENDING: - taskTypeState.makeTaskPending(taskId); - break; - case STAGING: - taskTypeState.makeTaskStaging(taskId); - break; - case ACTIVE: - taskTypeState.makeTaskActive(taskId); - break; - case KILLABLE: - taskTypeState.makeTaskKillable(taskId); - break; - case LOST: - taskTypeState.makeTaskLost(taskId); - break; - } - } - } - /** - * Class to keep all the tasks states for a particular taskPrefix together - * - */ - private static class SchedulerStateForType { - private final String taskPrefix; private Set<Protos.TaskID> pendingTasks; private Set<Protos.TaskID> stagingTasks; @@ -467,15 +454,15 @@ public class SchedulerState { this.killableTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>()); } + @SuppressWarnings("unused") public String getTaskPrefix() { return taskPrefix; } - + public synchronized void makeTaskPending(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, - "taskId cannot be empty or null"); - + Objects.requireNonNull(taskId, "taskId cannot be empty or null"); + pendingTasks.add(taskId); stagingTasks.remove(taskId); activeTasks.remove(taskId); @@ -484,18 +471,16 @@ public class SchedulerState { } public synchronized void makeTaskStaging(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, - "taskId cannot be empty or null"); - pendingTasks.remove(taskId); - stagingTasks.add(taskId); - activeTasks.remove(taskId); - lostTasks.remove(taskId); - killableTasks.remove(taskId); + Objects.requireNonNull(taskId, "taskId cannot be empty or null"); + pendingTasks.remove(taskId); + stagingTasks.add(taskId); + activeTasks.remove(taskId); + lostTasks.remove(taskId); + killableTasks.remove(taskId); } public synchronized void makeTaskActive(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, - "taskId cannot be empty or null"); + Objects.requireNonNull(taskId, "taskId cannot be empty or null"); pendingTasks.remove(taskId); stagingTasks.remove(taskId); activeTasks.add(taskId); @@ -504,8 +489,7 @@ public class SchedulerState { } public synchronized void makeTaskLost(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, - "taskId cannot be empty or null"); + Objects.requireNonNull(taskId, "taskId cannot be empty or null"); pendingTasks.remove(taskId); stagingTasks.remove(taskId); activeTasks.remove(taskId); @@ -514,15 +498,14 @@ public class SchedulerState { } public synchronized void makeTaskKillable(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, - "taskId cannot be empty or null"); + Objects.requireNonNull(taskId, "taskId cannot be empty or null"); pendingTasks.remove(taskId); stagingTasks.remove(taskId); activeTasks.remove(taskId); lostTasks.remove(taskId); killableTasks.add(taskId); } - + public synchronized void removeTask(Protos.TaskID taskId) { this.pendingTasks.remove(taskId); this.stagingTasks.remove(taskId); @@ -530,7 +513,7 @@ public class SchedulerState { this.lostTasks.remove(taskId); this.killableTasks.remove(taskId); } - + public synchronized Set<Protos.TaskID> getPendingTaskIds() { return Collections.unmodifiableSet(this.pendingTasks); } @@ -552,15 +535,15 @@ public class SchedulerState { } } - /** - * TaskState type - * - */ - public enum TaskState { - PENDING, - STAGING, - ACTIVE, - KILLABLE, - LOST - } + + /** + * TaskState type + */ + public enum TaskState { + PENDING, + STAGING, + ACTIVE, + KILLABLE, + LOST + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java index 4711a52..fcf2cb8 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,20 +39,17 @@ import com.google.gson.GsonBuilder; import com.google.protobuf.GeneratedMessage; /** -* ByteBuffer support for the Serialization of the StoreContext -*/ + * ByteBuffer support for the Serialization of the StoreContext + */ public class ByteBufferSupport { public static final int INT_SIZE = Integer.SIZE / Byte.SIZE; public static final String UTF8 = "UTF-8"; public static final byte[] ZERO_BYTES = new byte[0]; private static Gson gson = new Gson(); - private static Gson gsonCustom = new GsonBuilder(). - registerTypeAdapter(ServiceResourceProfile.class, new ServiceResourceProfile.CustomDeserializer()). - create(); - - public static void addByteBuffers(List<ByteBuffer> list, - ByteArrayOutputStream bytes) throws IOException { + private static Gson gsonCustom = new GsonBuilder().registerTypeAdapter(ServiceResourceProfile.class, new ServiceResourceProfile.CustomDeserializer()).create(); + + public static void addByteBuffers(List<ByteBuffer> list, ByteArrayOutputStream bytes) throws IOException { // If list, add the list size, then the size of each buffer followed by the buffer. if (list != null) { bytes.write(toIntBytes(list.size())); @@ -64,8 +61,7 @@ public class ByteBufferSupport { } } - public static void addByteBuffer(ByteBuffer bb, - ByteArrayOutputStream bytes) throws IOException { + public static void addByteBuffer(ByteBuffer bb, ByteArrayOutputStream bytes) throws IOException { if (bb != null && bytes != null) { bytes.write(toIntBytes(bb.array().length)); bytes.write(bb.array()); @@ -139,11 +135,11 @@ public class ByteBufferSupport { } else { size += INT_SIZE; } - + if (nt.getExecutorInfo() != null) { - size += nt.getExecutorInfo().getSerializedSize() + INT_SIZE; + size += nt.getExecutorInfo().getSerializedSize() + INT_SIZE; } else { - size += INT_SIZE; + size += INT_SIZE; } byte[] taskPrefixBytes = ZERO_BYTES; @@ -151,7 +147,7 @@ public class ByteBufferSupport { taskPrefixBytes = toBytes(nt.getTaskPrefix()); size += taskPrefixBytes.length + INT_SIZE; } - + // Allocate and populate the buffer. ByteBuffer bb = createBuffer(size); putBytes(bb, profile); @@ -197,11 +193,11 @@ public class ByteBufferSupport { /** * ByteBuffer is expected to have a NodeTask at its next position. - * - * @param bb - * @return NodeTask or null if buffer is empty. Can throw a RuntimeException - * if the buffer is not formatted correctly. - */ + * + * @param bb + * @return NodeTask or null if buffer is empty. Can throw a RuntimeException + * if the buffer is not formatted correctly. + */ public static NodeTask toNodeTask(ByteBuffer bb) { NodeTask nt = null; if (bb != null && bb.array().length > 0) { @@ -261,7 +257,7 @@ public class ByteBufferSupport { * @return string from the next position, or "" if the size is zero */ public static String toString(ByteBuffer bb) { - byte [] bytes = new byte[bb.getInt()]; + byte[] bytes = new byte[bb.getInt()]; String s = ""; try { if (bytes.length > 0) { @@ -269,8 +265,7 @@ public class ByteBufferSupport { s = new String(bytes, UTF8); } } catch (Exception e) { - throw new RuntimeException("ByteBuffer not in expected format," + - " failed to parse string bytes", e); + throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse string bytes", e); } return s; } @@ -314,8 +309,7 @@ public class ByteBufferSupport { try { return Protos.SlaveID.parseFrom(getBytes(bb, size)); } catch (Exception e) { - throw new RuntimeException("ByteBuffer not in expected format," + - " failed to parse SlaveId bytes", e); + throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse SlaveId bytes", e); } } else { return null; @@ -328,8 +322,7 @@ public class ByteBufferSupport { try { return Protos.TaskStatus.parseFrom(getBytes(bb, size)); } catch (Exception e) { - throw new RuntimeException("ByteBuffer not in expected format," + - " failed to parse TaskStatus bytes", e); + throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse TaskStatus bytes", e); } } else { return null; @@ -342,8 +335,7 @@ public class ByteBufferSupport { try { return Protos.ExecutorInfo.parseFrom(getBytes(bb, size)); } catch (Exception e) { - throw new RuntimeException("ByteBuffer not in expected format," + - " failed to parse ExecutorInfo bytes", e); + throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse ExecutorInfo bytes", e); } } else { return null; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java index 364a4c3..2ffcaa7 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -37,10 +37,10 @@ import org.apache.mesos.Protos.TaskID; import com.ebay.myriad.state.NodeTask; /** -* The purpose of this container/utility is to create a mechanism to serialize the SchedulerState -* to RMStateStore and back. Json did not seem to handle the Protos fields very well so this was an -* alternative approach. -*/ + * The purpose of this container/utility is to create a mechanism to serialize the SchedulerState + * to RMStateStore and back. Json did not seem to handle the Protos fields very well so this was an + * alternative approach. + */ public final class StoreContext { private static Pattern taskIdPattern = Pattern.compile("\\."); private ByteBuffer frameworkId; @@ -57,6 +57,7 @@ public final class StoreContext { /** * Accept all the SchedulerState maps and flatten them into lists of ByteBuffers + * * @param tasks * @param pendingTasks * @param stagingTasks @@ -64,11 +65,8 @@ public final class StoreContext { * @param lostTasks * @param killableTasks */ - public StoreContext(Protos.FrameworkID frameworkId, - Map<Protos.TaskID, NodeTask> tasks, - Set<Protos.TaskID> pendingTasks, Set<Protos.TaskID> stagingTasks, - Set<Protos.TaskID> activeTasks, Set<Protos.TaskID> lostTasks, - Set<Protos.TaskID> killableTasks) { + public StoreContext(Protos.FrameworkID frameworkId, Map<Protos.TaskID, NodeTask> tasks, Set<Protos.TaskID> pendingTasks, Set<Protos.TaskID> stagingTasks, Set<Protos.TaskID> activeTasks, Set<Protos.TaskID> lostTasks, Set<Protos.TaskID> + killableTasks) { setFrameworkId(frameworkId); setTasks(tasks); setPendingTasks(pendingTasks); @@ -80,6 +78,7 @@ public final class StoreContext { /** * Accept list of ByteBuffers and re-create the SchedulerState maps. + * * @param framwrorkId * @param taskIds * @param taskNodes @@ -89,11 +88,8 @@ public final class StoreContext { * @param lostTasks * @param killableTasks */ - public StoreContext(ByteBuffer frameworkId, - List<ByteBuffer> taskIds, List<ByteBuffer> taskNodes, - List<ByteBuffer> pendingTasks, List<ByteBuffer> stagingTasks, - List<ByteBuffer> activeTasks, List<ByteBuffer> lostTasks, - List<ByteBuffer> killableTasks) { + public StoreContext(ByteBuffer frameworkId, List<ByteBuffer> taskIds, List<ByteBuffer> taskNodes, List<ByteBuffer> pendingTasks, List<ByteBuffer> stagingTasks, List<ByteBuffer> activeTasks, List<ByteBuffer> lostTasks, List<ByteBuffer> + killableTasks) { this.frameworkId = frameworkId; this.taskIds = taskIds; this.taskNodes = taskNodes; @@ -106,6 +102,7 @@ public final class StoreContext { /** * Use this to gather bytes to push to the state store + * * @return byte stream of the state store context. * @throws IOException */ @@ -131,7 +128,7 @@ public final class StoreContext { @SuppressWarnings("unchecked") public static StoreContext fromSerializedBytes(byte bytes[]) { StoreContext ctx; - if (bytes != null && bytes.length > 0){ + if (bytes != null && bytes.length > 0) { ByteBuffer bb = ByteBufferSupport.fillBuffer(bytes); ByteBuffer frameworkId = ByteBufferSupport.createBuffer(bb); List<ByteBuffer> taskIds = ByteBufferSupport.createBufferList(bb, bb.getInt()); @@ -141,8 +138,7 @@ public final class StoreContext { List<ByteBuffer> activeTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); List<ByteBuffer> lostTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); List<ByteBuffer> killableTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); - ctx = new StoreContext(frameworkId, taskIds, taskNodes, pendingTasks, stagingTasks, activeTasks, - lostTasks, killableTasks); + ctx = new StoreContext(frameworkId, taskIds, taskNodes, pendingTasks, stagingTasks, activeTasks, lostTasks, killableTasks); } else { ctx = new StoreContext(); } @@ -173,7 +169,7 @@ public final class StoreContext { } /** - * Serialize the Protos.FrameworkID into a ByteBuffer. + * Serialize the Protos.FrameworkID into a ByteBuffer. */ public void setFrameworkId(Protos.FrameworkID frameworkId) { if (frameworkId != null) { @@ -213,7 +209,7 @@ public final class StoreContext { } } - public Set<Protos.TaskID> getPendingTasks () { + public Set<Protos.TaskID> getPendingTasks() { return toTaskSet(pendingTasks); } @@ -263,7 +259,7 @@ public final class StoreContext { private void toTaskBuffer(Set<Protos.TaskID> src, List<ByteBuffer> tgt) { for (Protos.TaskID id : src) { - tgt.add(ByteBufferSupport.toByteBuffer(id)); + tgt.add(ByteBufferSupport.toByteBuffer(id)); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java index e618aea..a52b310 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,20 +30,20 @@ import javax.inject.Inject; */ public class HttpConnectorProvider implements Provider<Connector> { - private MyriadConfiguration myriadConf; + private MyriadConfiguration myriadConf; - @Inject - public HttpConnectorProvider(MyriadConfiguration myriadConf) { - this.myriadConf = myriadConf; - } + @Inject + public HttpConnectorProvider(MyriadConfiguration myriadConf) { + this.myriadConf = myriadConf; + } - @Override - public Connector get() { - SelectChannelConnector ret = new SelectChannelConnector(); - ret.setName("Myriad"); - ret.setHost("0.0.0.0"); - ret.setPort(myriadConf.getRestApiPort()); + @Override + public Connector get() { + SelectChannelConnector ret = new SelectChannelConnector(); + ret.setName("Myriad"); + ret.setHost("0.0.0.0"); + ret.setPort(myriadConf.getRestApiPort()); - return ret; - } + return ret; + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java index 263860a..cea22d1 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -31,15 +31,15 @@ import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider; */ public class MyriadServletModule extends ServletModule { - @Override - protected void configureServlets() { - bind(ClustersResource.class); - bind(ConfigurationResource.class); - bind(SchedulerStateResource.class); + @Override + protected void configureServlets() { + bind(ClustersResource.class); + bind(ConfigurationResource.class); + bind(SchedulerStateResource.class); - bind(GuiceContainer.class); - bind(JacksonJaxbJsonProvider.class).in(Scopes.SINGLETON); + bind(GuiceContainer.class); + bind(JacksonJaxbJsonProvider.class).in(Scopes.SINGLETON); - serve("/api/*").with(GuiceContainer.class); - } + serve("/api/*").with(GuiceContainer.class); + } }