http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java index 0000000,0000000..57237b6 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java @@@ -1,0 -1,0 +1,591 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux; ++ ++import backtype.storm.Config; ++import backtype.storm.generated.StormTopology; ++import backtype.storm.grouping.CustomStreamGrouping; ++import backtype.storm.topology.*; ++import backtype.storm.tuple.Fields; ++import backtype.storm.utils.Utils; ++import org.apache.storm.flux.api.TopologySource; ++import org.apache.storm.flux.model.*; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++import java.lang.reflect.*; ++import java.util.ArrayList; ++import java.util.Collection; ++import java.util.List; ++import java.util.Map; ++ ++public class FluxBuilder { ++ private static Logger LOG = LoggerFactory.getLogger(FluxBuilder.class); ++ ++ /** ++ * Given a topology definition, return a populated `backtype.storm.Config` instance. ++ * ++ * @param topologyDef ++ * @return ++ */ ++ public static Config buildConfig(TopologyDef topologyDef) { ++ // merge contents of `config` into topology config ++ Config conf = new Config(); ++ conf.putAll(topologyDef.getConfig()); ++ return conf; ++ } ++ ++ /** ++ * Given a topology definition, return a Storm topology that can be run either locally or remotely. ++ * ++ * @param context ++ * @return ++ * @throws IllegalAccessException ++ * @throws InstantiationException ++ * @throws ClassNotFoundException ++ * @throws NoSuchMethodException ++ * @throws InvocationTargetException ++ */ ++ static StormTopology buildTopology(ExecutionContext context) throws IllegalAccessException, ++ InstantiationException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException { ++ ++ StormTopology topology = null; ++ TopologyDef topologyDef = context.getTopologyDef(); ++ ++ if(!topologyDef.validate()){ ++ throw new IllegalArgumentException("Invalid topology config. Spouts, bolts and streams cannot be " + ++ "defined in the same configuration as a topologySource."); ++ } ++ ++ // build components that may be referenced by spouts, bolts, etc. ++ // the map will be a String --> Object where the object is a fully ++ // constructed class instance ++ buildComponents(context); ++ ++ if(topologyDef.isDslTopology()) { ++ // This is a DSL (YAML, etc.) topology... ++ LOG.info("Detected DSL topology..."); ++ ++ TopologyBuilder builder = new TopologyBuilder(); ++ ++ // create spouts ++ buildSpouts(context, builder); ++ ++ // we need to be able to lookup bolts by id, then switch based ++ // on whether they are IBasicBolt or IRichBolt instances ++ buildBolts(context); ++ ++ // process stream definitions ++ buildStreamDefinitions(context, builder); ++ ++ topology = builder.createTopology(); ++ } else { ++ // user class supplied... ++ // this also provides a bridge to Trident... ++ LOG.info("A topology source has been specified..."); ++ ObjectDef def = topologyDef.getTopologySource(); ++ topology = buildExternalTopology(def, context); ++ } ++ return topology; ++ } ++ ++ /** ++ * Given a `java.lang.Object` instance and a method name, attempt to find a method that matches the input ++ * parameter: `java.util.Map` or `backtype.storm.Config`. ++ * ++ * @param topologySource object to inspect for the specified method ++ * @param methodName name of the method to look for ++ * @return ++ * @throws NoSuchMethodException ++ */ ++ private static Method findGetTopologyMethod(Object topologySource, String methodName) throws NoSuchMethodException { ++ Class clazz = topologySource.getClass(); ++ Method[] methods = clazz.getMethods(); ++ ArrayList<Method> candidates = new ArrayList<Method>(); ++ for(Method method : methods){ ++ if(!method.getName().equals(methodName)){ ++ continue; ++ } ++ if(!method.getReturnType().equals(StormTopology.class)){ ++ continue; ++ } ++ Class[] paramTypes = method.getParameterTypes(); ++ if(paramTypes.length != 1){ ++ continue; ++ } ++ if(paramTypes[0].isAssignableFrom(Map.class) || paramTypes[0].isAssignableFrom(Config.class)){ ++ candidates.add(method); ++ } ++ } ++ ++ if(candidates.size() == 0){ ++ throw new IllegalArgumentException("Unable to find method '" + methodName + "' method in class: " + clazz.getName()); ++ } else if (candidates.size() > 1){ ++ LOG.warn("Found multiple candidate methods in class '" + clazz.getName() + "'. Using the first one found"); ++ } ++ ++ return candidates.get(0); ++ } ++ ++ /** ++ * @param context ++ * @param builder ++ */ ++ private static void buildStreamDefinitions(ExecutionContext context, TopologyBuilder builder) ++ throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, ++ IllegalAccessException { ++ TopologyDef topologyDef = context.getTopologyDef(); ++ // process stream definitions ++ for (StreamDef stream : topologyDef.getStreams()) { ++ Object boltObj = context.getBolt(stream.getTo()); ++ BoltDeclarer declarer = null; ++ if (boltObj instanceof IRichBolt) { ++ declarer = builder.setBolt(stream.getTo(), ++ (IRichBolt) boltObj, ++ topologyDef.parallelismForBolt(stream.getTo())); ++ } else if (boltObj instanceof IBasicBolt) { ++ declarer = builder.setBolt( ++ stream.getTo(), ++ (IBasicBolt) boltObj, ++ topologyDef.parallelismForBolt(stream.getTo())); ++ } else { ++ throw new IllegalArgumentException("Class does not appear to be a bolt: " + ++ boltObj.getClass().getName()); ++ } ++ ++ GroupingDef grouping = stream.getGrouping(); ++ // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream ++ String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId()); ++ ++ ++ switch (grouping.getType()) { ++ case SHUFFLE: ++ declarer.shuffleGrouping(stream.getFrom(), streamId); ++ break; ++ case FIELDS: ++ //TODO check for null grouping args ++ declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs())); ++ break; ++ case ALL: ++ declarer.allGrouping(stream.getFrom(), streamId); ++ break; ++ case DIRECT: ++ declarer.directGrouping(stream.getFrom(), streamId); ++ break; ++ case GLOBAL: ++ declarer.globalGrouping(stream.getFrom(), streamId); ++ break; ++ case LOCAL_OR_SHUFFLE: ++ declarer.localOrShuffleGrouping(stream.getFrom(), streamId); ++ break; ++ case NONE: ++ declarer.noneGrouping(stream.getFrom(), streamId); ++ break; ++ case CUSTOM: ++ declarer.customGrouping(stream.getFrom(), streamId, ++ buildCustomStreamGrouping(stream.getGrouping().getCustomClass(), context)); ++ break; ++ default: ++ throw new UnsupportedOperationException("unsupported grouping type: " + grouping); ++ } ++ } ++ } ++ ++ private static void applyProperties(ObjectDef bean, Object instance, ExecutionContext context) throws ++ IllegalAccessException, InvocationTargetException { ++ List<PropertyDef> props = bean.getProperties(); ++ Class clazz = instance.getClass(); ++ if (props != null) { ++ for (PropertyDef prop : props) { ++ Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue(); ++ Method setter = findSetter(clazz, prop.getName(), value); ++ if (setter != null) { ++ LOG.debug("found setter, attempting to invoke"); ++ // invoke setter ++ setter.invoke(instance, new Object[]{value}); ++ } else { ++ // look for a public instance variable ++ LOG.debug("no setter found. Looking for a public instance variable..."); ++ Field field = findPublicField(clazz, prop.getName(), value); ++ if (field != null) { ++ field.set(instance, value); ++ } ++ } ++ } ++ } ++ } ++ ++ private static Field findPublicField(Class clazz, String property, Object arg) { ++ Field field = null; ++ try { ++ field = clazz.getField(property); ++ } catch (NoSuchFieldException e) { ++ LOG.warn("Could not find setter or public variable for property: " + property, e); ++ } ++ return field; ++ } ++ ++ private static Method findSetter(Class clazz, String property, Object arg) { ++ String setterName = toSetterName(property); ++ Method retval = null; ++ Method[] methods = clazz.getMethods(); ++ for (Method method : methods) { ++ if (setterName.equals(method.getName())) { ++ LOG.debug("Found setter method: " + method.getName()); ++ retval = method; ++ } ++ } ++ return retval; ++ } ++ ++ private static String toSetterName(String name) { ++ return "set" + name.substring(0, 1).toUpperCase() + name.substring(1, name.length()); ++ } ++ ++ private static List<Object> resolveReferences(List<Object> args, ExecutionContext context) { ++ LOG.debug("Checking arguments for references."); ++ List<Object> cArgs = new ArrayList<Object>(); ++ // resolve references ++ for (Object arg : args) { ++ if (arg instanceof BeanReference) { ++ cArgs.add(context.getComponent(((BeanReference) arg).getId())); ++ } else { ++ cArgs.add(arg); ++ } ++ } ++ return cArgs; ++ } ++ ++ private static Object buildObject(ObjectDef def, ExecutionContext context) throws ClassNotFoundException, ++ IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { ++ Class clazz = Class.forName(def.getClassName()); ++ Object obj = null; ++ if (def.hasConstructorArgs()) { ++ LOG.debug("Found constructor arguments in definition: " + def.getConstructorArgs().getClass().getName()); ++ List<Object> cArgs = def.getConstructorArgs(); ++ if(def.hasReferences()){ ++ cArgs = resolveReferences(cArgs, context); ++ } ++ Constructor con = findCompatibleConstructor(cArgs, clazz); ++ if (con != null) { ++ LOG.debug("Found something seemingly compatible, attempting invocation..."); ++ obj = con.newInstance(getArgsWithListCoercian(cArgs, con.getParameterTypes())); ++ } else { ++ String msg = String.format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.", ++ clazz.getName(), ++ cArgs); ++ throw new IllegalArgumentException(msg); ++ } ++ } else { ++ obj = clazz.newInstance(); ++ } ++ applyProperties(def, obj, context); ++ invokeConfigMethods(def, obj, context); ++ return obj; ++ } ++ ++ private static StormTopology buildExternalTopology(ObjectDef def, ExecutionContext context) ++ throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, ++ InvocationTargetException { ++ ++ Object topologySource = buildObject(def, context); ++ ++ String methodName = context.getTopologyDef().getTopologySource().getMethodName(); ++ Method getTopology = findGetTopologyMethod(topologySource, methodName); ++ if(getTopology.getParameterTypes()[0].equals(Config.class)){ ++ Config config = new Config(); ++ config.putAll(context.getTopologyDef().getConfig()); ++ return (StormTopology) getTopology.invoke(topologySource, config); ++ } else { ++ return (StormTopology) getTopology.invoke(topologySource, context.getTopologyDef().getConfig()); ++ } ++ } ++ ++ private static CustomStreamGrouping buildCustomStreamGrouping(ObjectDef def, ExecutionContext context) ++ throws ClassNotFoundException, ++ IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { ++ Object grouping = buildObject(def, context); ++ return (CustomStreamGrouping)grouping; ++ } ++ ++ /** ++ * Given a topology definition, resolve and instantiate all components found and return a map ++ * keyed by the component id. ++ */ ++ private static void buildComponents(ExecutionContext context) throws ClassNotFoundException, NoSuchMethodException, ++ IllegalAccessException, InvocationTargetException, InstantiationException { ++ Collection<BeanDef> cDefs = context.getTopologyDef().getComponents(); ++ if (cDefs != null) { ++ for (BeanDef bean : cDefs) { ++ Object obj = buildObject(bean, context); ++ context.addComponent(bean.getId(), obj); ++ } ++ } ++ } ++ ++ ++ private static void buildSpouts(ExecutionContext context, TopologyBuilder builder) throws ClassNotFoundException, ++ NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { ++ for (SpoutDef sd : context.getTopologyDef().getSpouts()) { ++ IRichSpout spout = buildSpout(sd, context); ++ builder.setSpout(sd.getId(), spout, sd.getParallelism()); ++ context.addSpout(sd.getId(), spout); ++ } ++ } ++ ++ /** ++ * Given a spout definition, return a Storm spout implementation by attempting to find a matching constructor ++ * in the given spout class. Perform list to array conversion as necessary. ++ */ ++ private static IRichSpout buildSpout(SpoutDef def, ExecutionContext context) throws ClassNotFoundException, ++ IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { ++ return (IRichSpout)buildObject(def, context); ++ } ++ ++ /** ++ * Given a list of bolt definitions, build a map of Storm bolts with the bolt definition id as the key. ++ * Attempt to coerce the given constructor arguments to a matching bolt constructor as much as possible. ++ */ ++ private static void buildBolts(ExecutionContext context) throws ClassNotFoundException, IllegalAccessException, ++ InstantiationException, NoSuchMethodException, InvocationTargetException { ++ for (BoltDef def : context.getTopologyDef().getBolts()) { ++ Class clazz = Class.forName(def.getClassName()); ++ Object bolt = buildObject(def, context); ++ context.addBolt(def.getId(), bolt); ++ } ++ } ++ ++ /** ++ * Given a list of constructor arguments, and a target class, attempt to find a suitable constructor. ++ * ++ */ ++ private static Constructor findCompatibleConstructor(List<Object> args, Class target) throws NoSuchMethodException { ++ Constructor retval = null; ++ int eligibleCount = 0; ++ ++ LOG.debug("Target class: {}", target.getName()); ++ Constructor[] cons = target.getDeclaredConstructors(); ++ ++ for (Constructor con : cons) { ++ Class[] paramClasses = con.getParameterTypes(); ++ if (paramClasses.length == args.size()) { ++ LOG.debug("found constructor with same number of args.."); ++ boolean invokable = canInvokeWithArgs(args, con.getParameterTypes()); ++ if (invokable) { ++ retval = con; ++ eligibleCount++; ++ } ++ LOG.debug("** invokable --> {}", invokable); ++ } else { ++ LOG.debug("Skipping constructor with wrong number of arguments."); ++ } ++ } ++ if (eligibleCount > 1) { ++ LOG.warn("Found multiple invokable constructors for class {}, given arguments {}. Using the last one found.", ++ target, args); ++ } ++ return retval; ++ } ++ ++ ++ public static void invokeConfigMethods(ObjectDef bean, Object instance, ExecutionContext context) ++ throws InvocationTargetException, IllegalAccessException { ++ ++ List<ConfigMethodDef> methodDefs = bean.getConfigMethods(); ++ if(methodDefs == null || methodDefs.size() == 0){ ++ return; ++ } ++ Class clazz = instance.getClass(); ++ for(ConfigMethodDef methodDef : methodDefs){ ++ List<Object> args = methodDef.getArgs(); ++ if(methodDef.hasReferences()){ ++ args = resolveReferences(args, context); ++ } ++ String methodName = methodDef.getName(); ++ Method method = findCompatibleMethod(args, clazz, methodName); ++ if(method != null) { ++ Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes()); ++ method.invoke(instance, methodArgs); ++ } else { ++ String msg = String.format("Unable to find configuration method '%s' in class '%s' with arguments %s.", ++ new Object[]{methodName, clazz.getName(), args}); ++ throw new IllegalArgumentException(msg); ++ } ++ } ++ } ++ ++ private static Method findCompatibleMethod(List<Object> args, Class target, String methodName){ ++ Method retval = null; ++ int eligibleCount = 0; ++ ++ LOG.debug("Target class: {}", target.getName()); ++ Method[] methods = target.getMethods(); ++ ++ for (Method method : methods) { ++ Class[] paramClasses = method.getParameterTypes(); ++ if (paramClasses.length == args.size() && method.getName().equals(methodName)) { ++ LOG.debug("found constructor with same number of args.."); ++ boolean invokable = canInvokeWithArgs(args, method.getParameterTypes()); ++ if (invokable) { ++ retval = method; ++ eligibleCount++; ++ } ++ LOG.debug("** invokable --> {}", invokable); ++ } else { ++ LOG.debug("Skipping method with wrong number of arguments."); ++ } ++ } ++ if (eligibleCount > 1) { ++ LOG.warn("Found multiple invokable methods for class {}, method {}, given arguments {}. " + ++ "Using the last one found.", ++ new Object[]{target, methodName, args}); ++ } ++ return retval; ++ } ++ ++ /** ++ * Given a java.util.List of contructor/method arguments, and a list of parameter types, attempt to convert the ++ * list to an java.lang.Object array that can be used to invoke the constructor. If an argument needs ++ * to be coerced from a List to an Array, do so. ++ */ ++ private static Object[] getArgsWithListCoercian(List<Object> args, Class[] parameterTypes) { ++// Class[] parameterTypes = constructor.getParameterTypes(); ++ if (parameterTypes.length != args.size()) { ++ throw new IllegalArgumentException("Contructor parameter count does not egual argument size."); ++ } ++ Object[] constructorParams = new Object[args.size()]; ++ ++ // loop through the arguments, if we hit a list that has to be convered to an array, ++ // perform the conversion ++ for (int i = 0; i < args.size(); i++) { ++ Object obj = args.get(i); ++ Class paramType = parameterTypes[i]; ++ Class objectType = obj.getClass(); ++ LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.", ++ paramType, objectType); ++ if (paramType.equals(objectType)) { ++ LOG.debug("They are the same class."); ++ constructorParams[i] = args.get(i); ++ continue; ++ } ++ if (paramType.isAssignableFrom(objectType)) { ++ LOG.debug("Assignment is possible."); ++ constructorParams[i] = args.get(i); ++ continue; ++ } ++ if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){ ++ LOG.debug("Its a primitive number."); ++ Number num = (Number)args.get(i); ++ if(paramType == Float.TYPE){ ++ constructorParams[i] = num.floatValue(); ++ } else if (paramType == Double.TYPE) { ++ constructorParams[i] = num.doubleValue(); ++ } else if (paramType == Long.TYPE) { ++ constructorParams[i] = num.longValue(); ++ } else if (paramType == Integer.TYPE) { ++ constructorParams[i] = num.intValue(); ++ } else if (paramType == Short.TYPE) { ++ constructorParams[i] = num.shortValue(); ++ } else if (paramType == Byte.TYPE) { ++ constructorParams[i] = num.byteValue(); ++ } else { ++ constructorParams[i] = args.get(i); ++ } ++ continue; ++ } ++ ++ // enum conversion ++ if(paramType.isEnum() && objectType.equals(String.class)){ ++ LOG.debug("Yes, will convert a String to enum"); ++ constructorParams[i] = Enum.valueOf(paramType, (String)args.get(i)); ++ continue; ++ } ++ ++ // List to array conversion ++ if (paramType.isArray() && List.class.isAssignableFrom(objectType)) { ++ // TODO more collection content type checking ++ LOG.debug("Conversion appears possible..."); ++ List list = (List) obj; ++ LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), list.get(0).getClass()); ++ ++ // create an array of the right type ++ Object newArrayObj = Array.newInstance(paramType.getComponentType(), list.size()); ++ for (int j = 0; j < list.size(); j++) { ++ Array.set(newArrayObj, j, list.get(j)); ++ ++ } ++ constructorParams[i] = newArrayObj; ++ LOG.debug("After conversion: {}", constructorParams[i]); ++ } ++ } ++ return constructorParams; ++ } ++ ++ ++ /** ++ * Determine if the given constructor/method parameter types are compatible given arguments List. Consider if ++ * list coercian can make it possible. ++ * ++ * @param args ++ * @param parameterTypes ++ * @return ++ */ ++ private static boolean canInvokeWithArgs(List<Object> args, Class[] parameterTypes) { ++ if (parameterTypes.length != args.size()) { ++ LOG.warn("parameter types were the wrong size"); ++ return false; ++ } ++ ++ for (int i = 0; i < args.size(); i++) { ++ Object obj = args.get(i); ++ Class paramType = parameterTypes[i]; ++ Class objectType = obj.getClass(); ++ LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.", ++ paramType, objectType); ++ if (paramType.equals(objectType)) { ++ LOG.debug("Yes, they are the same class."); ++ return true; ++ } ++ if (paramType.isAssignableFrom(objectType)) { ++ LOG.debug("Yes, assignment is possible."); ++ return true; ++ } ++ if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){ ++ return true; ++ } ++ if(paramType.isEnum() && objectType.equals(String.class)){ ++ LOG.debug("Yes, will convert a String to enum"); ++ return true; ++ } ++ if (paramType.isArray() && List.class.isAssignableFrom(objectType)) { ++ // TODO more collection content type checking ++ LOG.debug("Assignment is possible if we convert a List to an array."); ++ LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), ((List) obj).get(0).getClass()); ++ ++ return true; ++ } ++ return false; ++ } ++ return false; ++ } ++ ++ public static boolean isPrimitiveNumber(Class clazz){ ++ return clazz.isPrimitive() && !clazz.equals(boolean.class); ++ } ++} ++
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java index 0000000,0000000..fbccfb7 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java @@@ -1,0 -1,0 +1,39 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.api; ++ ++ ++import backtype.storm.generated.StormTopology; ++ ++import java.util.Map; ++ ++/** ++ * Marker interface for objects that can produce `StormTopology` objects. ++ * ++ * If a `topology-source` class implements the `getTopology()` method, Flux will ++ * call that method. Otherwise, it will introspect the given class and look for a ++ * similar method that produces a `StormTopology` instance. ++ * ++ * Note that it is not strictly necessary for a class to implement this interface. ++ * If a class defines a method with a similar signature, Flux should be able to find ++ * and invoke it. ++ * ++ */ ++public interface TopologySource { ++ public StormTopology getTopology(Map<String, Object> config); ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java index 0000000,0000000..72ca5ae new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java @@@ -1,0 -1,0 +1,39 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++import java.util.ArrayList; ++import java.util.LinkedHashMap; ++import java.util.List; ++import java.util.Map; ++ ++/** ++ * A representation of a Java object that is uniquely identifyable, and given a className, constructor arguments, ++ * and properties, can be instantiated. ++ */ ++public class BeanDef extends ObjectDef { ++ private String id; ++ ++ public String getId() { ++ return id; ++ } ++ ++ public void setId(String id) { ++ this.id = id; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java index 0000000,0000000..bd236f1 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java @@@ -1,0 -1,0 +1,39 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++/** ++ * A bean reference is simply a string pointer to another id. ++ */ ++public class BeanReference { ++ public String id; ++ ++ public BeanReference(){} ++ ++ public BeanReference(String id){ ++ this.id = id; ++ } ++ ++ public String getId() { ++ return id; ++ } ++ ++ public void setId(String id) { ++ this.id = id; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java index 0000000,0000000..362abf1 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java @@@ -1,0 -1,0 +1,24 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++/** ++ * Bean representation of a Storm bolt. ++ */ ++public class BoltDef extends VertexDef { ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java index 0000000,0000000..6f7e4d4 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java @@@ -1,0 -1,0 +1,62 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++import java.util.ArrayList; ++import java.util.LinkedHashMap; ++import java.util.List; ++import java.util.Map; ++ ++public class ConfigMethodDef { ++ private String name; ++ private List<Object> args; ++ private boolean hasReferences = false; ++ ++ public String getName() { ++ return name; ++ } ++ ++ public void setName(String name) { ++ this.name = name; ++ } ++ ++ public List<Object> getArgs() { ++ return args; ++ } ++ ++ public void setArgs(List<Object> args) { ++ ++ List<Object> newVal = new ArrayList<Object>(); ++ for(Object obj : args){ ++ if(obj instanceof LinkedHashMap){ ++ Map map = (Map)obj; ++ if(map.containsKey("ref") && map.size() == 1){ ++ newVal.add(new BeanReference((String)map.get("ref"))); ++ this.hasReferences = true; ++ } ++ } else { ++ newVal.add(obj); ++ } ++ } ++ this.args = newVal; ++ } ++ ++ public boolean hasReferences(){ ++ return this.hasReferences; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java index 0000000,0000000..e94b887 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java @@@ -1,0 -1,0 +1,77 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++import backtype.storm.Config; ++import backtype.storm.task.IBolt; ++import backtype.storm.topology.IRichSpout; ++ ++import java.util.HashMap; ++import java.util.List; ++import java.util.Map; ++ ++/** ++ * Container for all the objects required to instantiate a topology. ++ */ ++public class ExecutionContext { ++ // parsed Topology definition ++ TopologyDef topologyDef; ++ ++ // Storm config ++ private Config config; ++ ++ // components ++ private List<Object> compontents; ++ // indexed by id ++ private Map<String, Object> componentMap = new HashMap<String, Object>(); ++ ++ private Map<String, IRichSpout> spoutMap = new HashMap<String, IRichSpout>(); ++ ++ private List<IBolt> bolts; ++ private Map<String, Object> boltMap = new HashMap<String, Object>(); ++ ++ public ExecutionContext(TopologyDef topologyDef, Config config){ ++ this.topologyDef = topologyDef; ++ this.config = config; ++ } ++ ++ public TopologyDef getTopologyDef(){ ++ return this.topologyDef; ++ } ++ ++ public void addSpout(String id, IRichSpout spout){ ++ this.spoutMap.put(id, spout); ++ } ++ ++ public void addBolt(String id, Object bolt){ ++ this.boltMap.put(id, bolt); ++ } ++ ++ public Object getBolt(String id){ ++ return this.boltMap.get(id); ++ } ++ ++ public void addComponent(String id, Object value){ ++ this.componentMap.put(id, value); ++ } ++ ++ public Object getComponent(String id){ ++ return this.componentMap.get(id); ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java index 0000000,0000000..e4fac8e new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java @@@ -1,0 -1,0 +1,77 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++import java.util.List; ++ ++/** ++ * Bean representation of a Storm stream grouping. ++ */ ++public class GroupingDef { ++ ++ /** ++ * Types of stream groupings Storm allows ++ */ ++ public static enum Type { ++ ALL, ++ CUSTOM, ++ DIRECT, ++ SHUFFLE, ++ LOCAL_OR_SHUFFLE, ++ FIELDS, ++ GLOBAL, ++ NONE ++ } ++ ++ private Type type; ++ private String streamId; ++ private List<String> args; ++ private ObjectDef customClass; ++ ++ public List<String> getArgs() { ++ return args; ++ } ++ ++ public void setArgs(List<String> args) { ++ this.args = args; ++ } ++ ++ public Type getType() { ++ return type; ++ } ++ ++ public void setType(Type type) { ++ this.type = type; ++ } ++ ++ public String getStreamId() { ++ return streamId; ++ } ++ ++ public void setStreamId(String streamId) { ++ this.streamId = streamId; ++ } ++ ++ public ObjectDef getCustomClass() { ++ return customClass; ++ } ++ ++ public void setCustomClass(ObjectDef customClass) { ++ this.customClass = customClass; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java index 0000000,0000000..23fd9d2 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java @@@ -1,0 -1,0 +1,54 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++/** ++ * Represents an include. Includes can be either a file or a classpath resource. ++ * ++ * If an include is marked as `override=true` then existing properties will be replaced. ++ * ++ */ ++public class IncludeDef { ++ private boolean resource = false; ++ boolean override = false; ++ private String file; ++ ++ public boolean isResource() { ++ return resource; ++ } ++ ++ public void setResource(boolean resource) { ++ this.resource = resource; ++ } ++ ++ public String getFile() { ++ return file; ++ } ++ ++ public void setFile(String file) { ++ this.file = file; ++ } ++ ++ public boolean isOverride() { ++ return override; ++ } ++ ++ public void setOverride(boolean override) { ++ this.override = override; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java index 0000000,0000000..7386900 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java @@@ -1,0 -1,0 +1,90 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++import backtype.storm.Config; ++ ++import java.util.ArrayList; ++import java.util.LinkedHashMap; ++import java.util.List; ++import java.util.Map; ++ ++/** ++ * A representation of a Java object that given a className, constructor arguments, ++ * and properties, can be instantiated. ++ */ ++public class ObjectDef { ++ private String className; ++ private List<Object> constructorArgs; ++ private boolean hasReferences; ++ private List<PropertyDef> properties; ++ private List<ConfigMethodDef> configMethods; ++ ++ public String getClassName() { ++ return className; ++ } ++ ++ public void setClassName(String className) { ++ this.className = className; ++ } ++ ++ public List<Object> getConstructorArgs() { ++ return constructorArgs; ++ } ++ ++ public void setConstructorArgs(List<Object> constructorArgs) { ++ ++ List<Object> newVal = new ArrayList<Object>(); ++ for(Object obj : constructorArgs){ ++ if(obj instanceof LinkedHashMap){ ++ Map map = (Map)obj; ++ if(map.containsKey("ref") && map.size() == 1){ ++ newVal.add(new BeanReference((String)map.get("ref"))); ++ this.hasReferences = true; ++ } ++ } else { ++ newVal.add(obj); ++ } ++ } ++ this.constructorArgs = newVal; ++ } ++ ++ public boolean hasConstructorArgs(){ ++ return this.constructorArgs != null && this.constructorArgs.size() > 0; ++ } ++ ++ public boolean hasReferences(){ ++ return this.hasReferences; ++ } ++ ++ public List<PropertyDef> getProperties() { ++ return properties; ++ } ++ ++ public void setProperties(List<PropertyDef> properties) { ++ this.properties = properties; ++ } ++ ++ public List<ConfigMethodDef> getConfigMethods() { ++ return configMethods; ++ } ++ ++ public void setConfigMethods(List<ConfigMethodDef> configMethods) { ++ this.configMethods = configMethods; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java index 0000000,0000000..f3d7704 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java @@@ -1,0 -1,0 +1,58 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++public class PropertyDef { ++ private String name; ++ private Object value; ++ private String ref; ++ ++ public String getName() { ++ return name; ++ } ++ ++ public void setName(String name) { ++ this.name = name; ++ } ++ ++ public Object getValue() { ++ return value; ++ } ++ ++ public void setValue(Object value) { ++ if(this.ref != null){ ++ throw new IllegalStateException("A property can only have a value OR a reference, not both."); ++ } ++ this.value = value; ++ } ++ ++ public String getRef() { ++ return ref; ++ } ++ ++ public void setRef(String ref) { ++ if(this.value != null){ ++ throw new IllegalStateException("A property can only have a value OR a reference, not both."); ++ } ++ this.ref = ref; ++ } ++ ++ public boolean isReference(){ ++ return this.ref != null; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java index 0000000,0000000..277c601 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java @@@ -1,0 -1,0 +1,24 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++/** ++ * Bean representation of a Storm spout. ++ */ ++public class SpoutDef extends VertexDef { ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java index 0000000,0000000..da80f1c new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java @@@ -1,0 -1,0 +1,64 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++/** ++ * Represents a stream of tuples from one Storm component (Spout or Bolt) to another (an edge in the topology DAG). ++ * ++ * Required fields are `from` and `to`, which define the source and destination, and the stream `grouping`. ++ * ++ */ ++public class StreamDef { ++ ++ private String name; // not used, placeholder for GUI, etc. ++ private String from; ++ private String to; ++ private GroupingDef grouping; ++ ++ public String getTo() { ++ return to; ++ } ++ ++ public void setTo(String to) { ++ this.to = to; ++ } ++ ++ public String getName() { ++ return name; ++ } ++ ++ public void setName(String name) { ++ this.name = name; ++ } ++ ++ public String getFrom() { ++ return from; ++ } ++ ++ public void setFrom(String from) { ++ this.from = from; ++ } ++ ++ public GroupingDef getGrouping() { ++ return grouping; ++ } ++ ++ public void setGrouping(GroupingDef grouping) { ++ this.grouping = grouping; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java index 0000000,0000000..a6ae450 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java @@@ -1,0 -1,0 +1,216 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++import java.util.*; ++ ++/** ++ * Bean represenation of a topology. ++ * ++ * It consists of the following: ++ * 1. The topology name ++ * 2. A `java.util.Map` representing the `backtype.storm.config` for the topology ++ * 3. A list of spout definitions ++ * 4. A list of bolt definitions ++ * 5. A list of stream definitions that define the flow between spouts and bolts. ++ * ++ */ ++public class TopologyDef { ++ private static Logger LOG = LoggerFactory.getLogger(TopologyDef.class); ++ ++ private String name; ++ private Map<String, BeanDef> componentMap = new LinkedHashMap<String, BeanDef>(); // not required ++ private List<IncludeDef> includes; // not required ++ private Map<String, Object> config = new HashMap<String, Object>(); ++ ++ // a "topology source" is a class that can produce a `StormTopology` thrift object. ++ private TopologySourceDef topologySource; ++ ++ // the following are required if we're defining a core storm topology DAG in YAML, etc. ++ private Map<String, BoltDef> boltMap = new LinkedHashMap<String, BoltDef>(); ++ private Map<String, SpoutDef> spoutMap = new LinkedHashMap<String, SpoutDef>(); ++ private List<StreamDef> streams = new ArrayList<StreamDef>(); ++ ++ ++ public String getName() { ++ return name; ++ } ++ ++ public void setName(String name) { ++ this.name = name; ++ } ++ ++ public void setName(String name, boolean override){ ++ if(this.name == null || override){ ++ this.name = name; ++ } else { ++ LOG.warn("Ignoring attempt to set property 'name' with override == false."); ++ } ++ } ++ ++ public List<SpoutDef> getSpouts() { ++ ArrayList<SpoutDef> retval = new ArrayList<SpoutDef>(); ++ retval.addAll(this.spoutMap.values()); ++ return retval; ++ } ++ ++ public void setSpouts(List<SpoutDef> spouts) { ++ this.spoutMap = new LinkedHashMap<String, SpoutDef>(); ++ for(SpoutDef spout : spouts){ ++ this.spoutMap.put(spout.getId(), spout); ++ } ++ } ++ ++ public List<BoltDef> getBolts() { ++ ArrayList<BoltDef> retval = new ArrayList<BoltDef>(); ++ retval.addAll(this.boltMap.values()); ++ return retval; ++ } ++ ++ public void setBolts(List<BoltDef> bolts) { ++ this.boltMap = new LinkedHashMap<String, BoltDef>(); ++ for(BoltDef bolt : bolts){ ++ this.boltMap.put(bolt.getId(), bolt); ++ } ++ } ++ ++ public List<StreamDef> getStreams() { ++ return streams; ++ } ++ ++ public void setStreams(List<StreamDef> streams) { ++ this.streams = streams; ++ } ++ ++ public Map<String, Object> getConfig() { ++ return config; ++ } ++ ++ public void setConfig(Map<String, Object> config) { ++ this.config = config; ++ } ++ ++ public List<BeanDef> getComponents() { ++ ArrayList<BeanDef> retval = new ArrayList<BeanDef>(); ++ retval.addAll(this.componentMap.values()); ++ return retval; ++ } ++ ++ public void setComponents(List<BeanDef> components) { ++ this.componentMap = new LinkedHashMap<String, BeanDef>(); ++ for(BeanDef component : components){ ++ this.componentMap.put(component.getId(), component); ++ } ++ } ++ ++ public List<IncludeDef> getIncludes() { ++ return includes; ++ } ++ ++ public void setIncludes(List<IncludeDef> includes) { ++ this.includes = includes; ++ } ++ ++ // utility methods ++ public int parallelismForBolt(String boltId){ ++ return this.boltMap.get(boltId).getParallelism(); ++ } ++ ++ public BoltDef getBoltDef(String id){ ++ return this.boltMap.get(id); ++ } ++ ++ public SpoutDef getSpoutDef(String id){ ++ return this.spoutMap.get(id); ++ } ++ ++ public BeanDef getComponent(String id){ ++ return this.componentMap.get(id); ++ } ++ ++ // used by includes implementation ++ public void addAllBolts(List<BoltDef> bolts, boolean override){ ++ for(BoltDef bolt : bolts){ ++ String id = bolt.getId(); ++ if(this.boltMap.get(id) == null || override) { ++ this.boltMap.put(bolt.getId(), bolt); ++ } else { ++ LOG.warn("Ignoring attempt to create bolt '{}' with override == false.", id); ++ } ++ } ++ } ++ ++ public void addAllSpouts(List<SpoutDef> spouts, boolean override){ ++ for(SpoutDef spout : spouts){ ++ String id = spout.getId(); ++ if(this.spoutMap.get(id) == null || override) { ++ this.spoutMap.put(spout.getId(), spout); ++ } else { ++ LOG.warn("Ignoring attempt to create spout '{}' with override == false.", id); ++ } ++ } ++ } ++ ++ public void addAllComponents(List<BeanDef> components, boolean override) { ++ for(BeanDef bean : components){ ++ String id = bean.getId(); ++ if(this.componentMap.get(id) == null || override) { ++ this.componentMap.put(bean.getId(), bean); ++ } else { ++ LOG.warn("Ignoring attempt to create component '{}' with override == false.", id); ++ } ++ } ++ } ++ ++ public void addAllStreams(List<StreamDef> streams, boolean override) { ++ //TODO figure out how we want to deal with overrides. Users may want to add streams even when overriding other ++ // properties. For now we just add them blindly which could lead to a potentially invalid topology. ++ this.streams.addAll(streams); ++ } ++ ++ public TopologySourceDef getTopologySource() { ++ return topologySource; ++ } ++ ++ public void setTopologySource(TopologySourceDef topologySource) { ++ this.topologySource = topologySource; ++ } ++ ++ public boolean isDslTopology(){ ++ return this.topologySource == null; ++ } ++ ++ ++ public boolean validate(){ ++ boolean hasSpouts = this.spoutMap != null && this.spoutMap.size() > 0; ++ boolean hasBolts = this.boltMap != null && this.boltMap.size() > 0; ++ boolean hasStreams = this.streams != null && this.streams.size() > 0; ++ boolean hasSpoutsBoltsStreams = hasStreams && hasBolts && hasSpouts; ++ // you cant define a topologySource and a DSL topology at the same time... ++ if (!isDslTopology() && ((hasSpouts || hasBolts || hasStreams))) { ++ return false; ++ } ++ if(isDslTopology() && (hasSpouts && hasBolts && hasStreams)) { ++ return true; ++ } ++ return true; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java index 0000000,0000000..d6a2f57 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java @@@ -1,0 -1,0 +1,36 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++public class TopologySourceDef extends ObjectDef { ++ public static final String DEFAULT_METHOD_NAME = "getTopology"; ++ ++ private String methodName; ++ ++ public TopologySourceDef(){ ++ this.methodName = DEFAULT_METHOD_NAME; ++ } ++ ++ public String getMethodName() { ++ return methodName; ++ } ++ ++ public void setMethodName(String methodName) { ++ this.methodName = methodName; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java index 0000000,0000000..e71bcc2 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java @@@ -1,0 -1,0 +1,36 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.model; ++ ++/** ++ * Abstract parent class of component definitions ++ * (spouts/bolts) ++ */ ++public abstract class VertexDef extends BeanDef { ++ ++ // default parallelism to 1 so if it's ommitted, the topology will still function. ++ private int parallelism = 1; ++ ++ public int getParallelism() { ++ return parallelism; ++ } ++ ++ public void setParallelism(int parallelism) { ++ this.parallelism = parallelism; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java index 0000000,0000000..72f8a8e new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java @@@ -1,0 -1,0 +1,202 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.parser; ++ ++import org.apache.storm.flux.api.TopologySource; ++import org.apache.storm.flux.model.BoltDef; ++import org.apache.storm.flux.model.IncludeDef; ++import org.apache.storm.flux.model.SpoutDef; ++import org.apache.storm.flux.model.TopologyDef; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++import org.yaml.snakeyaml.TypeDescription; ++import org.yaml.snakeyaml.Yaml; ++import org.yaml.snakeyaml.constructor.Constructor; ++ ++import java.io.ByteArrayOutputStream; ++import java.io.FileInputStream; ++import java.io.IOException; ++import java.io.InputStream; ++import java.nio.ByteBuffer; ++import java.util.Map; ++import java.util.Properties; ++ ++public class FluxParser { ++ private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class); ++ ++ private FluxParser(){} ++ ++ // TODO refactor input stream processing (see parseResource() method). ++ public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes, ++ String propertiesFile, boolean envSub) throws IOException { ++ Yaml yaml = yaml(); ++ FileInputStream in = new FileInputStream(inputFile); ++ // TODO process properties, etc. ++ TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub); ++ in.close(); ++ if(dumpYaml){ ++ dumpYaml(topology, yaml); ++ } ++ if(processIncludes) { ++ return processIncludes(yaml, topology, propertiesFile, envSub); ++ } else { ++ return topology; ++ } ++ } ++ ++ public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes, ++ String propertiesFile, boolean envSub) throws IOException { ++ Yaml yaml = yaml(); ++ InputStream in = FluxParser.class.getResourceAsStream(resource); ++ if(in == null){ ++ LOG.error("Unable to load classpath resource: " + resource); ++ System.exit(1); ++ } ++ TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub); ++ in.close(); ++ if(dumpYaml){ ++ dumpYaml(topology, yaml); ++ } ++ if(processIncludes) { ++ return processIncludes(yaml, topology, propertiesFile, envSub); ++ } else { ++ return topology; ++ } ++ } ++ ++ private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile, boolean envSubstitution) throws IOException { ++ ByteArrayOutputStream bos = new ByteArrayOutputStream(); ++ LOG.info("loading YAML from input stream..."); ++ int b = -1; ++ while((b = in.read()) != -1){ ++ bos.write(b); ++ } ++ ++ // TODO substitution implementation is not exactly efficient or kind to memory... ++ String str = bos.toString(); ++ // properties file substitution ++ if(propsFile != null){ ++ LOG.info("Performing property substitution."); ++ InputStream propsIn = new FileInputStream(propsFile); ++ Properties props = new Properties(); ++ props.load(propsIn); ++ for(Object key : props.keySet()){ ++ str = str.replace("${" + key + "}", props.getProperty((String)key)); ++ } ++ } else { ++ LOG.info("Not performing property substitution."); ++ } ++ ++ // environment variable substitution ++ if(envSubstitution){ ++ LOG.info("Performing environment variable substitution..."); ++ Map<String, String> envs = System.getenv(); ++ for(String key : envs.keySet()){ ++ str = str.replace("${ENV-" + key + "}", envs.get(key)); ++ } ++ } else { ++ LOG.info("Not performing environment variable substitution."); ++ } ++ return (TopologyDef)yaml.load(str); ++ } ++ ++ private static void dumpYaml(TopologyDef topology, Yaml yaml){ ++ System.out.println("Configuration (interpreted): \n" + yaml.dump(topology)); ++ } ++ ++ private static Yaml yaml(){ ++ Constructor constructor = new Constructor(TopologyDef.class); ++ ++ TypeDescription topologyDescription = new TypeDescription(TopologyDef.class); ++ topologyDescription.putListPropertyType("spouts", SpoutDef.class); ++ topologyDescription.putListPropertyType("bolts", BoltDef.class); ++ topologyDescription.putListPropertyType("includes", IncludeDef.class); ++ constructor.addTypeDescription(topologyDescription); ++ ++ Yaml yaml = new Yaml(constructor); ++ return yaml; ++ } ++ ++ /** ++ * ++ * @param yaml the yaml parser for parsing the include file(s) ++ * @param topologyDef the topology definition containing (possibly zero) includes ++ * @return The TopologyDef with includes resolved. ++ */ ++ private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile, boolean envSub) ++ throws IOException { ++ //TODO support multiple levels of includes ++ if(topologyDef.getIncludes() != null) { ++ for (IncludeDef include : topologyDef.getIncludes()){ ++ TopologyDef includeTopologyDef = null; ++ if (include.isResource()) { ++ LOG.info("Loading includes from resource: {}", include.getFile()); ++ includeTopologyDef = parseResource(include.getFile(), true, false, propsFile, envSub); ++ } else { ++ LOG.info("Loading includes from file: {}", include.getFile()); ++ includeTopologyDef = parseFile(include.getFile(), true, false, propsFile, envSub); ++ } ++ ++ // if overrides are disabled, we won't replace anything that already exists ++ boolean override = include.isOverride(); ++ // name ++ if(includeTopologyDef.getName() != null){ ++ topologyDef.setName(includeTopologyDef.getName(), override); ++ } ++ ++ // config ++ if(includeTopologyDef.getConfig() != null) { ++ //TODO move this logic to the model class ++ Map<String, Object> config = topologyDef.getConfig(); ++ Map<String, Object> includeConfig = includeTopologyDef.getConfig(); ++ if(override) { ++ config.putAll(includeTopologyDef.getConfig()); ++ } else { ++ for(String key : includeConfig.keySet()){ ++ if(config.containsKey(key)){ ++ LOG.warn("Ignoring attempt to set topology config property '{}' with override == false", key); ++ } ++ else { ++ config.put(key, includeConfig.get(key)); ++ } ++ } ++ } ++ } ++ ++ //component overrides ++ if(includeTopologyDef.getComponents() != null){ ++ topologyDef.addAllComponents(includeTopologyDef.getComponents(), override); ++ } ++ //bolt overrides ++ if(includeTopologyDef.getBolts() != null){ ++ topologyDef.addAllBolts(includeTopologyDef.getBolts(), override); ++ } ++ //spout overrides ++ if(includeTopologyDef.getSpouts() != null) { ++ topologyDef.addAllSpouts(includeTopologyDef.getSpouts(), override); ++ } ++ //stream overrides ++ //TODO streams should be uniquely identifiable ++ if(includeTopologyDef.getStreams() != null) { ++ topologyDef.addAllStreams(includeTopologyDef.getStreams(), override); ++ } ++ } // end include processing ++ } ++ return topologyDef; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/resources/splash.txt ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/main/resources/splash.txt index 0000000,0000000..337931a new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/main/resources/splash.txt @@@ -1,0 -1,0 +1,9 @@@ ++âââââââââââ âââ ââââââ âââ ++âââââââââââ âââ âââââââââââ ++ââââââ âââ âââ âââ ââââââ ++ââââââ âââ âââ âââ ââââââ ++âââ âââââââââââââââââââââ âââ ++âââ ââââââââ âââââââ âââ âââ +++- Apache Storm -+ +++- data FLow User eXperience -+ ++Version: ${project.version} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java index 0000000,0000000..ff67867 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java @@@ -1,0 -1,0 +1,31 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux; ++ ++import org.junit.Test; ++import static org.junit.Assert.*; ++ ++public class FluxBuilderTest { ++ ++ @Test ++ public void testIsPrimitiveNumber() throws Exception { ++ assertTrue(FluxBuilder.isPrimitiveNumber(int.class)); ++ assertFalse(FluxBuilder.isPrimitiveNumber(boolean.class)); ++ assertFalse(FluxBuilder.isPrimitiveNumber(String.class)); ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java index 0000000,0000000..5e17f5e new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java @@@ -1,0 -1,0 +1,41 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux; ++ ++import org.junit.Test; ++ ++public class IntegrationTest { ++ ++ private static boolean skipTest = true; ++ ++ static { ++ String skipStr = System.getProperty("skipIntegration"); ++ if(skipStr != null && skipStr.equalsIgnoreCase("false")){ ++ skipTest = false; ++ } ++ } ++ ++ ++ ++ @Test ++ public void testRunTopologySource() throws Exception { ++ if(!skipTest) { ++ Flux.main(new String[]{"-s", "30000", "src/test/resources/configs/existing-topology.yaml"}); ++ } ++ } ++}