[ https://issues.apache.org/jira/browse/EAGLE-98?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071906#comment-15071906 ]
Hao Chen commented on EAGLE-98: ------------------------------- After practice, we should support another kind of more expressive grok {code} filter("logStream") by grok { pattern("field"->"""""".r) add_field("new_field"->"value") } {code} > Eagle Declarative Topology Definition DSL > ----------------------------------------- > > Key: EAGLE-98 > URL: https://issues.apache.org/jira/browse/EAGLE-98 > Project: Eagle > Issue Type: New Feature > Affects Versions: 0.3.0 > Reporter: Hao Chen > Assignee: Hao Chen > Fix For: 0.3.0 > > > h2. Features > * High Level Stream-Oriented > * Declarative Streaming > * Metadata Driven > * Native Scala internal DSL > * Support Scala Programing or Script/Configure in *.egl > * Support static policy definition / dynamical policy loader > * IDE friendly features like sql-prefix and xml as email template. > * Name Reference > h2. Syntax > {code:language=scala} > // Topology Definition API by extends or script > import org.apache.eagle.stream.dsl.experimental.KafkaInterface._ > import org.apache.eagle.stream.dsl.experimental.DruidInterface._ > // #!/bin/bash > // exec scala "$0" "$@" > // !# > // # start > define ("metricStream_1") as ("name" -> 'string, "value"->'double, > "timestamp"->'long) from > kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts",deserializer="") > define ("metricStream_2") as ("name" -> 'string, "value"->'double, > "timestamp"->'long) from > kafka(topic="metricStream_2") > define ("logStream_3") from kafka(topic="logStream_3") > // filter by function > filter ("logStream_3") by {(line,collector) => collector.collect(line)} as > ("name" -> 'string, "value"->'double, "timestamp"->'long) > // "logStream_3" as ("name" -> 'string, "value"->'double, "timestamp"->'long) > // filter by pattern and rename stream > filter("logStream_3"->"logStream_3_parsed") by > """(?<timestamp>\d{4}-\d{2}-\d{2})""".r as ("name" -> 'string, > "value"->'double, "timestamp"-> datetime("YYYY-MM-DD")) > alert partitionBy "metricStream_1.metricType" parallism 1 by {sql""" > from metricStream_1[component=='dn' and > metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600] > select sum(value) group by host output every 1 hour insert into alertStream; > """} > aggregate partitionBy "metricStream_1.metricType" parallism 2 by {sql""" > from metricStream_1[component=='dn' and > metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600] > select sum(value) group by host output every 1 hour insert into > aggregatedMetricStream_1; > """} > 'alertStream ~> kafka("alert_topic",zk=conf"kafka.zk.hosts") > "alertStream" to mail( > from = "sen...@eagle.incubator.apache.org", > to = "recei...@eagle.incubator.apache.org", > smtp = "localhost:25", > template = > <html> > <head> > <title>Alert Notification</title> > </head> > <body> > <h1>Message</h1> > <p>$message</p> > </body> > </html> > ) > // split stream by logic > 'aggregatedMetricStream_1 to kafka("aggregated_stream_dn") where "component > == 'dn'" partitionBy "aggregatedMetricStream_1.metricType" > 'aggregatedMetricStream_1 ~> druid("aggregated_stream_nn") where "component > == 'nn'" partitionBy "aggregatedMetricStream_1.metricType" > // # end > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)