[ https://issues.apache.org/jira/browse/EAGLE-98?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15068506#comment-15068506 ]
Edward Zhang commented on EAGLE-98: ----------------------------------- where to put grok logic if we want to parse raw log by configuration grok expression only. > Eagle Declarative Topology Definition DSL > ----------------------------------------- > > Key: EAGLE-98 > URL: https://issues.apache.org/jira/browse/EAGLE-98 > Project: Eagle > Issue Type: New Feature > Affects Versions: 0.3.0 > Reporter: Hao Chen > Assignee: Hao Chen > Fix For: 0.3.0 > > > h2. Features > * High Level Stream-Oriented > * Declarative Streaming > * Metadata Driven > * Native Scala internal DSL > * Support Scala Programing or Script/Configure in *.egl > * Support static policy definition / dynamical policy loader > * IDE friendly features like sql-prefix and xml as email template. > h2. Syntax > {code} > // Plug-able DSL Interface > import org.apache.eagle.stream.dsl.AggregateInterface._ > import org.apache.eagle.stream.dsl.AlertInterface._ > import org.apache.eagle.stream.dsl.DefineInterface._ > import org.apache.eagle.stream.dsl.DruidInterface._ > import org.apache.eagle.stream.dsl.KafkaInterface._ > // Topology Definition API by extends or script > // #!/bin/bash > // exec scala "$0" "$@" > // !# > // # start > define ("metricStream_1") as ("name" -> 'string, "value"->'double, > "timestamp"->'long) from kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts") > define ("metricStream_2") as ("name" -> 'string, "value"->'double, > "timestamp"->'long) from kafka(topic="metricStream_2") > alert partitionBy "metricStream_1.metricType" parallism 1 by sql""" > from metricStream_1[component=='dn' and > metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600] > select sum(value) group by host output every 1 hour insert into alertStream; > """ > aggregate partitionBy "metricStream_1.metricType" parallism 2 by sql""" > from metricStream_1[component=='dn' and > metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600] > select sum(value) group by host output every 1 hour insert into > aggregatedMetricStream_1; > """ > "alertStream" ~> kafka("alert_topic",zk=conf"kafka.zk.hosts") > "alertStream" to mail( > from = "sen...@eagle.incubator.apache.org", > to = "recei...@eagle.incubator.apache.org", > smtp = "localhost:25", > template = > <html> > <head> > <title>Alert Notification</title> > </head> > <body> > <h1>Message</h1> > <p>$message</p> > </body> > </html> > ) > // split stream by logic > "aggregatedMetricStream_1" to kafka("aggregated_stream_dn") where "component > == 'dn'" partitionBy "aggregatedMetricStream_1.metricType" > "aggregatedMetricStream_1" ~> druid("aggregated_stream_nn") where "component > == 'nn'" partitionBy "aggregatedMetricStream_1.metricType" > // # end > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)