[ 
https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15055628#comment-15055628
 ] 

ASF GitHub Bot commented on EAGLE-66:
-------------------------------------

GitHub user haoch opened a pull request:

    https://github.com/apache/incubator-eagle/pull/26

    EAGLE-66. Typesafe Streaming DSL and KeyValue based Grouping

    https://issues.apache.org/jira/browse/EAGLE-66

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/haoch/incubator-eagle EAGLE-66

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-eagle/pull/26.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #26
    
----
commit 2d7003dd1200a500b329a518d63484600e1c2cdf
Author: Hao Chen <h...@apache.org>
Date:   2015-12-02T16:28:58Z

    [EAGLE-66] Keep type information in StreamProcessor

commit e3389cdd52f7f747db0968d9e233e4184b99f5ff
Author: Hao Chen <h...@apache.org>
Date:   2015-12-04T09:25:39Z

    [EAGLE-66] Decouple StreamProtocol and StreamProducer

commit e419ff886316bd536aa3c23aa337e51fa27a8c01
Author: Hao Chen <h...@apache.org>
Date:   2015-12-04T13:33:21Z

    [EAGLE-66] Decouple execution environment and config

commit 8b2d7a64f3f600cf49204cd5abff762e70e8791e
Author: Hao Chen <h...@apache.org>
Date:   2015-12-04T17:10:32Z

    [EAGLE-66] Simplify the stream processing API and configuration

commit 66abc2283092e04cb866b1280d1a286a14b3e117
Author: Hao Chen <h...@apache.org>
Date:   2015-12-04T17:25:39Z

    [EAGLE-66] Refactor AbstractStormSpoutProvider from abstract class to 
interface StormSpoutProvider

commit 5b50c3fdb457f7f70f1654c8b19dcb1dae08b1cf
Author: Hao Chen <h...@apache.org>
Date:   2015-12-04T18:18:14Z

    [EAGLE-66] Decouple ExecutionEnvironments factory functions

commit 5ca63dfa0fd755f008970cfa46dafdf5ac89dc29
Author: Hao Chen <h...@apache.org>
Date:   2015-12-04T18:20:54Z

    [EAGLE-66] Rename ConfigWrapper to Configurator

commit f61fde9513de0e65dab7c22cd7f1552022764f58
Author: Hao Chen <h...@apache.org>
Date:   2015-12-04T19:15:45Z

    [EAGLE-66] Refactor StreamProtocol `withName` to `as` interface

commit fbc234473d67d75aeece3c8f2d8963b93f11b3a0
Author: Hao Chen <h...@apache.org>
Date:   2015-12-05T22:19:17Z

    [EAGLE-66] Basically workable key-value stream processing API

commit c20bfa141d5db4fd64790db87e7980c05984e671
Author: Hao Chen <h...@apache.org>
Date:   2015-12-06T07:24:26Z

    [EAGLE-66] Simplify DAG structure log

commit a0ca7fa15088aded4f06eec8ab44bc2cacbb699c
Author: Hao Chen <h...@apache.org>
Date:   2015-12-06T08:27:53Z

    [EAGLE-66] Implement from(Iterable) to replace fromSeq(Seq)

commit 69e68746520ca4648b3987f46d30acc6d6ced79a
Author: Hao Chen <h...@apache.org>
Date:   2015-12-06T10:04:05Z

    [EAGLE-66] Clean DAG graph log

commit 7a6a13636ace590c148fc5793893d88b580e1f45
Author: Hao Chen <h...@apache.org>
Date:   2015-12-06T17:54:33Z

    [EAGLE-66] Keep entity type info through dataflow and resolve scala/java 
compatibility issues

commit 9d7b8ceaa6b0b75b132e319da9b6efa5d0f164e1
Author: Hao Chen <h...@apache.org>
Date:   2015-12-07T07:18:24Z

    [EAGLE-66] Refactored the package of datastream API

commit 26f6646f745ea4760f7934fb338d4b66a9c31510
Author: Hao Chen <h...@apache.org>
Date:   2015-12-07T14:52:35Z

    [EAGLE-66] Support init/reinit method for StreamInfo to extensibly resolve 
issues like serialization/type and so on

commit 6fb171071a8f09223fd2757b1912bc52e1d4551d
Author: Hao Chen <h...@apache.org>
Date:   2015-12-07T17:17:35Z

    [EAGLE-66] Support type for JavaFlatMap

commit c90efc0d34b7156d239a315f115064dc4c15bda9
Author: Hao Chen <h...@apache.org>
Date:   2015-12-08T12:24:24Z

    [EAGLE-66] Fix DAG name and log printing

commit fd11d063896d00de40d06a33030f6be8fa85b881
Author: Hao Chen <h...@apache.org>
Date:   2015-12-08T14:16:41Z

    [EAGLE-66] Resolve JavaTypeCompatible and add StreamTypeExpansion

commit 3b52a02c73e4a4651fd719103d78ea7566d6340b
Author: Hao Chen <h...@apache.org>
Date:   2015-12-14T06:46:12Z

    EAGLE-66. Erase type information and fix hashCode problem for id

commit 9cc8f996fc885dec6d856e965a92185c5a78286a
Author: Hao Chen <h...@apache.org>
Date:   2015-12-14T08:32:07Z

    EAGLE-66. Merge conflicts from master and support dot digraph
    
    Conflicts:
        
eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
        
eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
        
eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
        
eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java

----


> Eagle TypeSafe Stream Processing DSL
> ------------------------------------
>
>                 Key: EAGLE-66
>                 URL: https://issues.apache.org/jira/browse/EAGLE-66
>             Project: Eagle
>          Issue Type: Improvement
>    Affects Versions: 0.3.0
>            Reporter: Hao Chen
>            Assignee: Hao Chen
>             Fix For: 0.3.0
>
>
> h1. Main Features
> 1. Typesafe API: Currently the stream processing API is not type-safe (Type 
> info are erased by stream framework), all programming interfaces for 
> developer are faced to Object/AnyRef, which is not very friendly and 
> extensible for framework user (application developer):
> {code}
> public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, 
> AlertAPIEntity>> outputCollector)
> {code}
> So i propose the interface as (all type information are transparent for 
> developer, not need additional parameters, supported by Scala implicit 
> TypeTag)
> {code}class StreamProducer[+T <: Any] extends StreamInfo with 
> StreamProtocol[T]{code}
> * StreamInfo:  contains Stream core information including streamId, 
> processing element id/name, entity type info (class/TypeTag)
> * StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API 
> and java-compatible API
> {code}
> class StreamInfo  extends Serializable{
>   val id:Int = UniqueId.incrementAndGetId()
>   var name: String = null
>   var streamId:String=null
>   var parallelismNum: Int = 1
>   var inKeyed:Boolean = false
>   var outKeyed:Boolean = false
>   var keySelector:KeySelector = null
>   var typeClass:Class[_] = classOf[AnyRef]
>   @transient  implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
> }
> {code}
> And the StreamInfo can be shared through the runtime as implicit context for 
> execution layer as well:
> {code}
> abstract class AbstractStreamBolt[T](val fieldsNum:Int=0, val ack:Boolean = 
> true)(implicit streamInfo:StreamInfo) extends BaseRichBolt
> {code}
> 2. KeyValue Based Structure: currently framework user (developer) have to 
> handle with field declaration again and again, and framework and business 
> logic are highly coupled, according to the StreamProtocol description, user 
> should not care about framework level detail like internal data structure for 
> Storm using List<Object> with Fields<String> which is not friendly for 
> developer, we should make sure user focus on business logic only like:
> {code}
> env.from(tuples)
>       .groupByKey(_.name)
> {code}
> 3. Spout grouping instead of overriding Schema: currently especially in use 
> case like HdfsAuditLog Monitoring, if developer wants to  groupby certain 
> key, they are forced to override Schema (specific for storm) , which is not 
> good and un-reusable.
> 4. Environment Decoupled: currently the stream (metadata) /dag (logic) / 
> environment (execution) are coupled with storm internal implementation, which 
> is not good for becoming a metadata-driven pipeline framework in future, so 
> we should decouple it.
>  
> 5. Compatible with Field-based Structure in old framework and application.
> 6. Configuration: enhanced config wrapper upon typesafe-config for supporting 
> get/set/default and integrated with ExecutionEnvironment
> {code}
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
>   val streamName = env.config.get[String]("eagle.stream.name","eventStream")
>   val streamExecutorId = 
> env.config.get[String]("eagle.stream.executor",s"${streamName}Executor")
>   
> env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
> {code}
> h1. Sample Application
> {code}
> case class Entity(name:String,value:Double,var inc:Int=0)
> val tuples = Seq(
>       Entity("a", 1),
>       Entity("a", 2),
>       Entity("a", 3),
>       Entity("b", 2),
>       Entity("c", 3),
>       Entity("d", 3)
> )
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
> // DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 
> env.from(tuples)
>       .groupByKey(_.name)
>       .map(o => {o.inc += 2;o})
>       .filter(_.name != "b")
>       .filter(_.name != "c")
>       .groupByKey(o=>(o.name,o.value))
>       .map(o => (o.name,o))
>       .map(o => (o._1,o._2.value,o._2.inc))
>       .foreach(println)
>  env.execute()
> {code}
> Type is transparent for developer during both DAG compiling (programming) and 
> runtime (metadata) phases
> {code}
> 2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded 
> DAG 
> { 
>       IterableStreamProducer[Entity]_1{1} -> 
> GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
>       GroupByKeyProducer[<function1>(Entity)]_2{1} -> 
> MapProducer[Entity]_3{1} in shuffleGroup
>       MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
>       FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
> shuffleGroup
>       FilterProducer[Entity]_5{1} -> 
> GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
>       GroupByKeyProducer[<function1>(Entity)]_6{1} -> 
> MapProducer[Tuple2]_7{1} in shuffleGroup
>       MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
>       MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded 
> DAG 
> { 
>       IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in 
> groupByKey(<function1>)
>       MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
>       FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
> shuffleGroup
>       FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in 
> groupByKey(<function1>)
>       MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
>       MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm 
> topology DAG
> {
>       Spout[IterableStreamProducer[Entity]_1]{1} -> 
> Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
>       Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in 
> shuffleGroup
>       Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} 
> in shuffleGroup
>       Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in 
> groupByKey(<function1>)
>       Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in 
> shuffleGroup
>       Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in 
> shuffleGroup 
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to