*TODO* ✓ Typesafe API ✓ groupByKey ☐ Test compatibility for old application ☐ Refactor package to seperate core/storm/spark/others ☐ Support KeyValue based grouping for old application and existing executor / spout ☐ Support groupByKey(fields: String*) ☐ Refactor Alert Executor with groupByKey and StreamInfo, avoid passing upStreamNames ☐ Siddhi support as general engine: sql(String) ☐ Refactor Hdfs Schema Manifest (High coupling for developers now ) ☐ State Management: DAG flow, Performance Metrics, Visualization ☐ Java compatible Interface for function type parameters like mapper, filter and so on referring keySelector ☐ Refactor configuration style (maybe Qingwen could help about it) ☐ DAG Serialization/Deserialization (function serialization? json -> function? only support SQL at first phase)
Regards, Hao On Mon, Dec 7, 2015 at 3:47 PM, Hao Chen <h...@apache.org> wrote: > > JIRA: https://issues.apache.org/jira/browse/EAGLE-66 > Pull Request: https://github.com/apache/incubator-eagle/pull/17 > > *Main Changes* > > 1. *Typesafe API:* Currently the stream processing API is not type-safe > (Type info are erased by stream framework), all programming interfaces for > developer are faced to Object/AnyRef, which is not very friendly and > extensible for framework user (application developer): > > public void flatMap(java.util.List<*Object*> input, > Collector<Tuple2<String, AlertAPIEntity>> outputCollector) > > So i propose the interface as (all type information are transparent for > developer, not need additional parameters, supported by Scala implicit > TypeTag) > > *class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T]* > > - *StreamInfo: * contains Stream core information including streamId, > processing element id/name, entity type info (class/TypeTag) > - *StreamProtocol extends JavaStreamProtocol*: contains basic Stream > DSL API and java-compatible API > > class *StreamInfo* extends Serializable{ > val id:Int = UniqueId.incrementAndGetId() > var name: String = null > var streamId:String=null > var parallelismNum: Int = 1 > var inKeyed:Boolean = false > var outKeyed:Boolean = false > var keySelector:KeySelector = null > > var typeClass:Class[_] = classOf[AnyRef] > @transient implicit var *typeTag:ru.TypeTag[*_] = ru.typeTag[AnyRef] > } > > 2. *KeyValue Based Structure: *currently framework user (developer) have > to handle with field declaration again and again, and framework and > business logic are highly coupled, according to the StreamProtocol > description, user should not care about framework level detail like > internal data structure for Storm using List<Object> with Fields<String> > which is not friendly for developer, we should make sure user focus on > business logic only like: > > env.from(tuples) > .groupByKey(_.name) > > 3. *Spout grouping instead of overriding Schema*: currently especially in > use case like HdfsAuditLog Monitoring, if developer wants to groupby > certain key, they are forced to override Schema (specific for storm) , > which is not good and un-reusable. > > 4. *Environment Decoupled: *currently the stream (metadata) /dag (logic) > / environment (execution) are coupled with storm internal implementation, > which is not good for becoming a metadata-driven pipeline framework in > future, so we should decouple it. > > *5. Compatible with Field-based Structure in old framework and > application.* > > *Sample Application* > > case class Entity(name:String,value:Double,var inc:Int=0) > > val tuples = Seq( > Entity("a", 1), > Entity("a", 2), > Entity("a", 3), > Entity("b", 2), > Entity("c", 3), > Entity("d", 3) > ) > > val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) > > > *// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 * > > env.from(tuples) > *.groupByKey(_.name)* > .map(o => {o.inc += 2;o}) > .filter(_.name != "b") > .filter(_.name != "c") > *.groupByKey(o=>(o.name <http://o.name>,o.value))* > .map(o => (o.name,o)) > .map(o => (o._1,o._2.value,o._2.inc)) > .foreach(println) > > env.execute() > > > *Type is transparent for developer during both DAG compiling (programming) > and runtime (metadata) phases* > > 2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before > expanded DAG > { > IterableStreamProducer[*Entity*]_1{1} -> > GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup > GroupByKeyProducer[<function1>(*Entity*)]_2{1} -> > MapProducer[Entity]_3{1} in shuffleGroup > MapProducer[*Entity]*_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup > FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in > shuffleGroup > FilterProducer[*Entity*]_5{1} -> > GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup > GroupByKeyProducer[<function1>(*Entity*)]_6{1} -> > MapProducer[Tuple2]_7{1} in shuffleGroup > MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup > MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup > } > 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After > expanded DAG > { > IterableStreamProducer[*Entity*]_1{1} -> MapProducer[Entity]_3{1} in > groupByKey(<function1>) > MapProducer[*Entity*]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup > FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in > shuffleGroup > FilterProducer[*Entity*]_5{1} -> MapProducer[Tuple2]_7{1} in > groupByKey(<function1>) > MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup > MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup > } > 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: > Storm topology DAG > { > Spout[IterableStreamProducer[*Entity*]_1]{1} -> > Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>) > Bolt[MapProducer[*Entity*]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in > shuffleGroup > Bolt[FilterProducer[*Entity*]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} > in shuffleGroup > Bolt[FilterProducer[*Entity*]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in > groupByKey(<function1>) > Bolt[MapProducer[*Tuple2*]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in > shuffleGroup > Bolt[MapProducer[*Tuple3*]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in > shuffleGroup > } > > > Regards, > Hao >