*TODO*

✓ Typesafe API
✓ groupByKey
☐ Test compatibility for old application
☐ Refactor package to seperate core/storm/spark/others
☐ Support KeyValue based grouping for old application and existing executor
/ spout
☐ Support groupByKey(fields: String*)
☐ Refactor Alert Executor with groupByKey and StreamInfo, avoid passing
upStreamNames
☐ Siddhi support as general engine: sql(String)
☐ Refactor Hdfs Schema Manifest (High coupling for developers now )
☐ State Management: DAG flow, Performance Metrics, Visualization
☐ Java compatible Interface for function type parameters like mapper,
filter and so on referring keySelector
☐ Refactor configuration style (maybe Qingwen could help about it)
☐ DAG Serialization/Deserialization (function serialization? json ->
function? only support SQL at first phase)

Regards,
Hao

On Mon, Dec 7, 2015 at 3:47 PM, Hao Chen <h...@apache.org> wrote:

>
> JIRA: https://issues.apache.org/jira/browse/EAGLE-66
> Pull Request: https://github.com/apache/incubator-eagle/pull/17
>
> *Main Changes*
>
> 1. *Typesafe API:* Currently the stream processing API is not type-safe
> (Type info are erased by stream framework), all programming interfaces for
> developer are faced to Object/AnyRef, which is not very friendly and
> extensible for framework user (application developer):
>
> public void flatMap(java.util.List<*Object*> input,
> Collector<Tuple2<String, AlertAPIEntity>> outputCollector)
>
> So i propose the interface as (all type information are transparent for
> developer, not need additional parameters, supported by Scala implicit
> TypeTag)
>
> *class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T]*
>
>    - *StreamInfo: * contains Stream core information including streamId,
>    processing element id/name, entity type info (class/TypeTag)
>    - *StreamProtocol extends JavaStreamProtocol*: contains basic Stream
>    DSL API and java-compatible API
>
> class *StreamInfo*  extends Serializable{
>   val id:Int = UniqueId.incrementAndGetId()
>   var name: String = null
>   var streamId:String=null
>   var parallelismNum: Int = 1
>   var inKeyed:Boolean = false
>   var outKeyed:Boolean = false
>   var keySelector:KeySelector = null
>
>   var typeClass:Class[_] = classOf[AnyRef]
>   @transient  implicit var *typeTag:ru.TypeTag[*_] = ru.typeTag[AnyRef]
> }
>
> 2. *KeyValue Based Structure: *currently framework user (developer) have
> to handle with field declaration again and again, and framework and
> business logic are highly coupled, according to the StreamProtocol
> description, user should not care about framework level detail like
> internal data structure for Storm using List<Object> with Fields<String>
> which is not friendly for developer, we should make sure user focus on
> business logic only like:
>
> env.from(tuples)
> .groupByKey(_.name)
>
> 3. *Spout grouping instead of overriding Schema*: currently especially in
> use case like HdfsAuditLog Monitoring, if developer wants to  groupby
> certain key, they are forced to override Schema (specific for storm) ,
> which is not good and un-reusable.
>
> 4. *Environment Decoupled: *currently the stream (metadata) /dag (logic)
> / environment (execution) are coupled with storm internal implementation,
> which is not good for becoming a metadata-driven pipeline framework in
> future, so we should decouple it.
>
> *5. Compatible with Field-based Structure in old framework and
> application.*
>
> *Sample Application*
>
> case class Entity(name:String,value:Double,var inc:Int=0)
>
> val tuples = Seq(
> Entity("a", 1),
> Entity("a", 2),
> Entity("a", 3),
> Entity("b", 2),
> Entity("c", 3),
> Entity("d", 3)
> )
>
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
>
>
> *// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 *
>
> env.from(tuples)
> *.groupByKey(_.name)*
> .map(o => {o.inc += 2;o})
> .filter(_.name != "b")
> .filter(_.name != "c")
> *.groupByKey(o=>(o.name <http://o.name>,o.value))*
> .map(o => (o.name,o))
> .map(o => (o._1,o._2.value,o._2.inc))
> .foreach(println)
>
>  env.execute()
>
>
> *Type is transparent for developer during both DAG compiling (programming)
> and runtime (metadata) phases*
>
> 2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before
> expanded DAG
> {
> IterableStreamProducer[*Entity*]_1{1} ->
> GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
> GroupByKeyProducer[<function1>(*Entity*)]_2{1} ->
> MapProducer[Entity]_3{1} in shuffleGroup
> MapProducer[*Entity]*_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
> FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in
> shuffleGroup
> FilterProducer[*Entity*]_5{1} ->
> GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
> GroupByKeyProducer[<function1>(*Entity*)]_6{1} ->
> MapProducer[Tuple2]_7{1} in shuffleGroup
> MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
> MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After
> expanded DAG
> {
> IterableStreamProducer[*Entity*]_1{1} -> MapProducer[Entity]_3{1} in
> groupByKey(<function1>)
> MapProducer[*Entity*]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
> FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in
> shuffleGroup
> FilterProducer[*Entity*]_5{1} -> MapProducer[Tuple2]_7{1} in
> groupByKey(<function1>)
> MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
> MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]:
> Storm topology DAG
> {
>   Spout[IterableStreamProducer[*Entity*]_1]{1} ->
> Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
> Bolt[MapProducer[*Entity*]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in
> shuffleGroup
> Bolt[FilterProducer[*Entity*]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1}
> in shuffleGroup
> Bolt[FilterProducer[*Entity*]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in
> groupByKey(<function1>)
> Bolt[MapProducer[*Tuple2*]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in
> shuffleGroup
> Bolt[MapProducer[*Tuple3*]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in
> shuffleGroup
> }
>
>
> Regards,
> Hao
>

Reply via email to