[ https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15055628#comment-15055628 ]
ASF GitHub Bot commented on EAGLE-66: ------------------------------------- GitHub user haoch opened a pull request: https://github.com/apache/incubator-eagle/pull/26 EAGLE-66. Typesafe Streaming DSL and KeyValue based Grouping https://issues.apache.org/jira/browse/EAGLE-66 You can merge this pull request into a Git repository by running: $ git pull https://github.com/haoch/incubator-eagle EAGLE-66 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-eagle/pull/26.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #26 ---- commit 2d7003dd1200a500b329a518d63484600e1c2cdf Author: Hao Chen <h...@apache.org> Date: 2015-12-02T16:28:58Z [EAGLE-66] Keep type information in StreamProcessor commit e3389cdd52f7f747db0968d9e233e4184b99f5ff Author: Hao Chen <h...@apache.org> Date: 2015-12-04T09:25:39Z [EAGLE-66] Decouple StreamProtocol and StreamProducer commit e419ff886316bd536aa3c23aa337e51fa27a8c01 Author: Hao Chen <h...@apache.org> Date: 2015-12-04T13:33:21Z [EAGLE-66] Decouple execution environment and config commit 8b2d7a64f3f600cf49204cd5abff762e70e8791e Author: Hao Chen <h...@apache.org> Date: 2015-12-04T17:10:32Z [EAGLE-66] Simplify the stream processing API and configuration commit 66abc2283092e04cb866b1280d1a286a14b3e117 Author: Hao Chen <h...@apache.org> Date: 2015-12-04T17:25:39Z [EAGLE-66] Refactor AbstractStormSpoutProvider from abstract class to interface StormSpoutProvider commit 5b50c3fdb457f7f70f1654c8b19dcb1dae08b1cf Author: Hao Chen <h...@apache.org> Date: 2015-12-04T18:18:14Z [EAGLE-66] Decouple ExecutionEnvironments factory functions commit 5ca63dfa0fd755f008970cfa46dafdf5ac89dc29 Author: Hao Chen <h...@apache.org> Date: 2015-12-04T18:20:54Z [EAGLE-66] Rename ConfigWrapper to Configurator commit f61fde9513de0e65dab7c22cd7f1552022764f58 Author: Hao Chen <h...@apache.org> Date: 2015-12-04T19:15:45Z [EAGLE-66] Refactor StreamProtocol `withName` to `as` interface commit fbc234473d67d75aeece3c8f2d8963b93f11b3a0 Author: Hao Chen <h...@apache.org> Date: 2015-12-05T22:19:17Z [EAGLE-66] Basically workable key-value stream processing API commit c20bfa141d5db4fd64790db87e7980c05984e671 Author: Hao Chen <h...@apache.org> Date: 2015-12-06T07:24:26Z [EAGLE-66] Simplify DAG structure log commit a0ca7fa15088aded4f06eec8ab44bc2cacbb699c Author: Hao Chen <h...@apache.org> Date: 2015-12-06T08:27:53Z [EAGLE-66] Implement from(Iterable) to replace fromSeq(Seq) commit 69e68746520ca4648b3987f46d30acc6d6ced79a Author: Hao Chen <h...@apache.org> Date: 2015-12-06T10:04:05Z [EAGLE-66] Clean DAG graph log commit 7a6a13636ace590c148fc5793893d88b580e1f45 Author: Hao Chen <h...@apache.org> Date: 2015-12-06T17:54:33Z [EAGLE-66] Keep entity type info through dataflow and resolve scala/java compatibility issues commit 9d7b8ceaa6b0b75b132e319da9b6efa5d0f164e1 Author: Hao Chen <h...@apache.org> Date: 2015-12-07T07:18:24Z [EAGLE-66] Refactored the package of datastream API commit 26f6646f745ea4760f7934fb338d4b66a9c31510 Author: Hao Chen <h...@apache.org> Date: 2015-12-07T14:52:35Z [EAGLE-66] Support init/reinit method for StreamInfo to extensibly resolve issues like serialization/type and so on commit 6fb171071a8f09223fd2757b1912bc52e1d4551d Author: Hao Chen <h...@apache.org> Date: 2015-12-07T17:17:35Z [EAGLE-66] Support type for JavaFlatMap commit c90efc0d34b7156d239a315f115064dc4c15bda9 Author: Hao Chen <h...@apache.org> Date: 2015-12-08T12:24:24Z [EAGLE-66] Fix DAG name and log printing commit fd11d063896d00de40d06a33030f6be8fa85b881 Author: Hao Chen <h...@apache.org> Date: 2015-12-08T14:16:41Z [EAGLE-66] Resolve JavaTypeCompatible and add StreamTypeExpansion commit 3b52a02c73e4a4651fd719103d78ea7566d6340b Author: Hao Chen <h...@apache.org> Date: 2015-12-14T06:46:12Z EAGLE-66. Erase type information and fix hashCode problem for id commit 9cc8f996fc885dec6d856e965a92185c5a78286a Author: Hao Chen <h...@apache.org> Date: 2015-12-14T08:32:07Z EAGLE-66. Merge conflicts from master and support dot digraph Conflicts: eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java ---- > Eagle TypeSafe Stream Processing DSL > ------------------------------------ > > Key: EAGLE-66 > URL: https://issues.apache.org/jira/browse/EAGLE-66 > Project: Eagle > Issue Type: Improvement > Affects Versions: 0.3.0 > Reporter: Hao Chen > Assignee: Hao Chen > Fix For: 0.3.0 > > > h1. Main Features > 1. Typesafe API: Currently the stream processing API is not type-safe (Type > info are erased by stream framework), all programming interfaces for > developer are faced to Object/AnyRef, which is not very friendly and > extensible for framework user (application developer): > {code} > public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, > AlertAPIEntity>> outputCollector) > {code} > So i propose the interface as (all type information are transparent for > developer, not need additional parameters, supported by Scala implicit > TypeTag) > {code}class StreamProducer[+T <: Any] extends StreamInfo with > StreamProtocol[T]{code} > * StreamInfo: contains Stream core information including streamId, > processing element id/name, entity type info (class/TypeTag) > * StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API > and java-compatible API > {code} > class StreamInfo extends Serializable{ > val id:Int = UniqueId.incrementAndGetId() > var name: String = null > var streamId:String=null > var parallelismNum: Int = 1 > var inKeyed:Boolean = false > var outKeyed:Boolean = false > var keySelector:KeySelector = null > var typeClass:Class[_] = classOf[AnyRef] > @transient implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef] > } > {code} > And the StreamInfo can be shared through the runtime as implicit context for > execution layer as well: > {code} > abstract class AbstractStreamBolt[T](val fieldsNum:Int=0, val ack:Boolean = > true)(implicit streamInfo:StreamInfo) extends BaseRichBolt > {code} > 2. KeyValue Based Structure: currently framework user (developer) have to > handle with field declaration again and again, and framework and business > logic are highly coupled, according to the StreamProtocol description, user > should not care about framework level detail like internal data structure for > Storm using List<Object> with Fields<String> which is not friendly for > developer, we should make sure user focus on business logic only like: > {code} > env.from(tuples) > .groupByKey(_.name) > {code} > 3. Spout grouping instead of overriding Schema: currently especially in use > case like HdfsAuditLog Monitoring, if developer wants to groupby certain > key, they are forced to override Schema (specific for storm) , which is not > good and un-reusable. > 4. Environment Decoupled: currently the stream (metadata) /dag (logic) / > environment (execution) are coupled with storm internal implementation, which > is not good for becoming a metadata-driven pipeline framework in future, so > we should decouple it. > > 5. Compatible with Field-based Structure in old framework and application. > 6. Configuration: enhanced config wrapper upon typesafe-config for supporting > get/set/default and integrated with ExecutionEnvironment > {code} > val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) > val streamName = env.config.get[String]("eagle.stream.name","eventStream") > val streamExecutorId = > env.config.get[String]("eagle.stream.executor",s"${streamName}Executor") > > env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName) > {code} > h1. Sample Application > {code} > case class Entity(name:String,value:Double,var inc:Int=0) > val tuples = Seq( > Entity("a", 1), > Entity("a", 2), > Entity("a", 3), > Entity("b", 2), > Entity("c", 3), > Entity("d", 3) > ) > val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) > // DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 > env.from(tuples) > .groupByKey(_.name) > .map(o => {o.inc += 2;o}) > .filter(_.name != "b") > .filter(_.name != "c") > .groupByKey(o=>(o.name,o.value)) > .map(o => (o.name,o)) > .map(o => (o._1,o._2.value,o._2.inc)) > .foreach(println) > env.execute() > {code} > Type is transparent for developer during both DAG compiling (programming) and > runtime (metadata) phases > {code} > 2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded > DAG > { > IterableStreamProducer[Entity]_1{1} -> > GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup > GroupByKeyProducer[<function1>(Entity)]_2{1} -> > MapProducer[Entity]_3{1} in shuffleGroup > MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup > FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in > shuffleGroup > FilterProducer[Entity]_5{1} -> > GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup > GroupByKeyProducer[<function1>(Entity)]_6{1} -> > MapProducer[Tuple2]_7{1} in shuffleGroup > MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup > MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup > } > 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded > DAG > { > IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in > groupByKey(<function1>) > MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup > FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in > shuffleGroup > FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in > groupByKey(<function1>) > MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup > MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup > } > 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm > topology DAG > { > Spout[IterableStreamProducer[Entity]_1]{1} -> > Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>) > Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in > shuffleGroup > Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} > in shuffleGroup > Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in > groupByKey(<function1>) > Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in > shuffleGroup > Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in > shuffleGroup > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)