[ 
https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15055776#comment-15055776
 ] 

ASF GitHub Bot commented on EAGLE-66:
-------------------------------------

Github user RalphSu commented on a diff in the pull request:

    https://github.com/apache/incubator-eagle/pull/26#discussion_r47481029
  
    --- Diff: 
eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
 ---
    @@ -0,0 +1,25 @@
    +package org.apache.eagle.datastream;
    --- End diff --
    
    http://creadur.apache.org/rat/apache-rat-plugin/rat-mojo.html  I think this 
might help.


> Eagle TypeSafe Stream Processing DSL
> ------------------------------------
>
>                 Key: EAGLE-66
>                 URL: https://issues.apache.org/jira/browse/EAGLE-66
>             Project: Eagle
>          Issue Type: Improvement
>    Affects Versions: 0.3.0
>            Reporter: Hao Chen
>            Assignee: Hao Chen
>             Fix For: 0.3.0
>
>
> h1. Main Features
> 1. Typesafe API: Currently the stream processing API is not type-safe (Type 
> info are erased by stream framework), all programming interfaces for 
> developer are faced to Object/AnyRef, which is not very friendly and 
> extensible for framework user (application developer):
> {code}
> public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, 
> AlertAPIEntity>> outputCollector)
> {code}
> So i propose the interface as (all type information are transparent for 
> developer, not need additional parameters, supported by Scala implicit 
> TypeTag)
> {code}class StreamProducer[+T <: Any] extends StreamInfo with 
> StreamProtocol[T]{code}
> * StreamInfo:  contains Stream core information including streamId, 
> processing element id/name, entity type info (class/TypeTag)
> * StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API 
> and java-compatible API
> {code}
> class StreamInfo  extends Serializable{
>   val id:Int = UniqueId.incrementAndGetId()
>   var name: String = null
>   var streamId:String=null
>   var parallelismNum: Int = 1
>   var inKeyed:Boolean = false
>   var outKeyed:Boolean = false
>   var keySelector:KeySelector = null
>   var typeClass:Class[_] = classOf[AnyRef]
>   @transient  implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
> }
> {code}
> And the StreamInfo can be shared through the runtime as implicit context for 
> execution layer as well:
> {code}
> abstract class AbstractStreamBolt[T](val fieldsNum:Int=0, val ack:Boolean = 
> true)(implicit streamInfo:StreamInfo) extends BaseRichBolt
> {code}
> 2. KeyValue Based Structure: currently framework user (developer) have to 
> handle with field declaration again and again, and framework and business 
> logic are highly coupled, according to the StreamProtocol description, user 
> should not care about framework level detail like internal data structure for 
> Storm using List<Object> with Fields<String> which is not friendly for 
> developer, we should make sure user focus on business logic only like:
> {code}
> env.from(tuples)
>       .groupByKey(_.name)
> {code}
> 3. Spout grouping instead of overriding Schema: currently especially in use 
> case like HdfsAuditLog Monitoring, if developer wants to  groupby certain 
> key, they are forced to override Schema (specific for storm) , which is not 
> good and un-reusable.
> 4. Environment Decoupled: currently the stream (metadata) /dag (logic) / 
> environment (execution) are coupled with storm internal implementation, which 
> is not good for becoming a metadata-driven pipeline framework in future, so 
> we should decouple it.
>  
> 5. Compatible with Field-based Structure in old framework and application.
> 6. Configuration: enhanced config wrapper upon typesafe-config for supporting 
> get/set/default and integrated with ExecutionEnvironment
> {code}
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
>   val streamName = env.config.get[String]("eagle.stream.name","eventStream")
>   val streamExecutorId = 
> env.config.get[String]("eagle.stream.executor",s"${streamName}Executor")
>   
> env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
> {code}
> h1. Sample Application
> {code}
> case class Entity(name:String,value:Double,var inc:Int=0)
> val tuples = Seq(
>       Entity("a", 1),
>       Entity("a", 2),
>       Entity("a", 3),
>       Entity("b", 2),
>       Entity("c", 3),
>       Entity("d", 3)
> )
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
> // DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 
> env.from(tuples)
>       .groupByKey(_.name)
>       .map(o => {o.inc += 2;o})
>       .filter(_.name != "b")
>       .filter(_.name != "c")
>       .groupByKey(o=>(o.name,o.value))
>       .map(o => (o.name,o))
>       .map(o => (o._1,o._2.value,o._2.inc))
>       .foreach(println)
>  env.execute()
> {code}
> Type is transparent for developer during both DAG compiling (programming) and 
> runtime (metadata) phases
> {code}
> 2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded 
> DAG 
> { 
>       IterableStreamProducer[Entity]_1{1} -> 
> GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
>       GroupByKeyProducer[<function1>(Entity)]_2{1} -> 
> MapProducer[Entity]_3{1} in shuffleGroup
>       MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
>       FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
> shuffleGroup
>       FilterProducer[Entity]_5{1} -> 
> GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
>       GroupByKeyProducer[<function1>(Entity)]_6{1} -> 
> MapProducer[Tuple2]_7{1} in shuffleGroup
>       MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
>       MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded 
> DAG 
> { 
>       IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in 
> groupByKey(<function1>)
>       MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
>       FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
> shuffleGroup
>       FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in 
> groupByKey(<function1>)
>       MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
>       MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm 
> topology DAG
> {
>       Spout[IterableStreamProducer[Entity]_1]{1} -> 
> Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
>       Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in 
> shuffleGroup
>       Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} 
> in shuffleGroup
>       Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in 
> groupByKey(<function1>)
>       Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in 
> shuffleGroup
>       Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in 
> shuffleGroup 
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to