[EAGLE-806] Integrate Metric Process and Persistence with Application Framework

Integrate Stream Source and Metric Persistence with Application Framework

- Provide API to easily ingest, process, aggregate and persist metric
- Integrate stream source
- Integrate metric definition and metric persistence.
- Implement basic plug-able aggregation abstraction for later usage.
- Provide `CEPFunction` as default `TransformFunction` based on `SiddhiCEP`

Author: Hao Chen <h...@apache.org>

Closes #692 from haoch/streamSourceAndPersist.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/10572c29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/10572c29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/10572c29

Branch: refs/heads/master
Commit: 10572c296f58532897cdea4f80132d1420890679
Parents: ff22537
Author: Hao Chen <h...@apache.org>
Authored: Wed Nov 30 09:38:11 2016 +0800
Committer: Hao Chen <h...@apache.org>
Committed: Wed Nov 30 09:38:11 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/coordinator/StreamColumn.java  | 408 ++++++-------
 .../engine/coordinator/StreamDefinition.java    | 288 ++++-----
 eagle-core/eagle-app/eagle-app-base/pom.xml     |  11 +
 .../app/environment/AbstractEnvironment.java    |  30 +-
 .../eagle/app/environment/Environment.java      |   6 +-
 .../environment/builder/AggregateFunction.java  |  67 +++
 .../environment/builder/ApplicationBuilder.java | 122 ++++
 .../app/environment/builder/CEPFunction.java    |  92 +++
 .../app/environment/builder/Collector.java      |  23 +
 .../app/environment/builder/MaxFunction.java    |  48 ++
 .../environment/builder/MetricDefinition.java   | 165 ++++++
 .../builder/StormOutputCollector.java           |  36 ++
 .../environment/builder/TransformFunction.java  |  30 +
 .../builder/TransformFunctionBolt.java          |  66 +++
 .../app/environment/impl/StormEnvironment.java  |  34 +-
 .../app/messaging/DefaultStreamSinkConfig.java  |  42 ++
 .../eagle/app/messaging/FlattenEventMapper.java |  60 ++
 .../apache/eagle/app/messaging/JsonSchema.java  |  66 +++
 .../app/messaging/KafkaStreamProvider.java      | 130 +++++
 .../eagle/app/messaging/KafkaStreamSink.java    |  97 ++++
 .../app/messaging/KafkaStreamSinkConfig.java    | 115 ++++
 .../eagle/app/messaging/KafkaStreamSource.java  | 162 ++++++
 .../app/messaging/KafkaStreamSourceConfig.java  | 153 +++++
 .../app/messaging/MetricStreamPersist.java      | 152 +++++
 .../eagle/app/messaging/StormStreamSink.java    |  71 +++
 .../eagle/app/messaging/StormStreamSource.java  |  25 +
 .../eagle/app/messaging/StreamEventMapper.java  |  35 ++
 .../eagle/app/messaging/StreamProvider.java     |  50 ++
 .../apache/eagle/app/messaging/StreamSink.java  |  24 +
 .../eagle/app/messaging/StreamSource.java       |  23 +
 .../eagle/app/service/ApplicationAction.java    |  24 +-
 .../impl/ApplicationProviderSPILoader.java      |   2 +-
 .../eagle/app/sink/DefaultStreamSinkConfig.java |  42 --
 .../eagle/app/sink/FlattenEventMapper.java      |  60 --
 .../apache/eagle/app/sink/KafkaStreamSink.java  | 144 -----
 .../eagle/app/sink/KafkaStreamSinkConfig.java   | 109 ----
 .../eagle/app/sink/LoggingStreamSink.java       |  55 --
 .../apache/eagle/app/sink/StormStreamSink.java  |  93 ---
 .../eagle/app/sink/StreamEventMapper.java       |  35 --
 .../org/apache/eagle/app/sink/StreamSink.java   |  24 -
 .../eagle/app/sink/StreamSinkProvider.java      |  42 --
 .../eagle/app/spi/ApplicationProvider.java      |   2 +-
 .../eagle/app/utils/StreamConvertHelper.java    |  51 ++
 .../apache/eagle/app/TestStormApplication.java  |   5 +-
 .../app/environment/StaticEnvironmentTest.java  |   2 +-
 .../eagle/app/storm/MockStormApplication.java   |   2 +-
 .../eagle/app/stream/CEPFunctionTest.java       |  50 ++
 .../src/test/resources/application.conf         |   5 +-
 .../org/apache/eagle/common/utils/Tuple2.java   |  42 ++
 .../apache/eagle/metadata/model/StreamDesc.java |  21 +-
 .../metadata/model/StreamSourceConfig.java      |  27 +
 .../src/test/resources/application-test.xml     |  17 +
 .../jdbc/provider/JDBCDataSourceProvider.java   |   2 +-
 .../ApplicationEntityServiceJDBCImpl.java       |   4 +-
 .../service/client/EagleServiceConnector.java   |   3 +-
 .../client/impl/EagleServiceClientImpl.java     |  15 +-
 .../app/example/ExampleStormApplication.java    |   2 +-
 .../src/test/resources/application.conf         |   2 +-
 .../org/apache/eagle/gc/GCLogApplication.java   |   2 +-
 eagle-hadoop-metric/pom.xml                     |  16 +-
 .../eagle/metric/HadoopMetricMonitorApp.java    |  38 ++
 .../metric/HadoopMetricMonitorAppProdiver.java  |  12 +-
 ...le.metric.HadoopMetricMonitorAppProdiver.xml |   4 +-
 .../HadoopMetricMonitorAppProdiverTest.java     |  84 ---
 .../metric/HadoopMetricMonitorAppDebug.java     |  23 +
 .../HadoopMetricMonitorAppProviderTest.java     |  84 +++
 .../eagle/metric/SendSampleDataToKafka.java     |  55 ++
 .../src/test/resources/application.conf         |  49 ++
 .../resources/hadoop_jmx_metric_sample.json     |   8 +
 ...adoopQueueRunningApplicationHealthCheck.java | 204 +++----
 .../jpm/aggregation/storm/AggregationSpout.java |   2 +-
 .../jpm/mr/history/MRHistoryJobApplication.java |   9 +-
 .../jpm/mr/running/MRRunningJobApplication.java |   4 +-
 .../src/test/resources/mrconf_30784.xml         |   2 +-
 .../SparkHistoryJobApplicationHealthCheck.java  | 198 +++----
 .../jpm/spark/running/SparkRunningJobApp.java   |   4 +-
 .../hbase/HBaseAuditLogApplication.java         |   2 +-
 .../AbstractHdfsAuditLogApplication.java        |   4 +-
 .../hive/HiveQueryMonitoringApplication.java    |   2 +-
 .../oozie/parse/OozieAuditLogApplication.java   |   2 +-
 eagle-server-assembly/src/main/conf/eagle.conf  |   4 +-
 .../src/main/resources/application.conf         |   4 +-
 .../apache/eagle/topology/TopologyCheckApp.java |   2 +-
 .../TopologyCheckApplicationHealthCheck.java    | 218 +++----
 .../hbase/HbaseTopologyEntityParser.java        | 330 +++++------
 .../hdfs/HdfsTopologyEntityParser.java          | 579 +++++++++----------
 .../extractor/mr/MRTopologyEntityParser.java    | 438 +++++++-------
 .../topology/storm/TopologyDataPersistBolt.java | 400 ++++++-------
 .../entity/HdfsServiceTopologyAPIEntity.java    | 246 ++++----
 89 files changed, 4118 insertions(+), 2424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index 4628043..9705efc 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -1,205 +1,205 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-public class StreamColumn implements Serializable {
-
-    private static final long serialVersionUID = -5457861313624389106L;
-    private String name;
-    private Type type;
-    private Object defaultValue;
-    private boolean required;
-    private String description;
-    private String nodataExpression;
-
-    public String toString() {
-        return String.format("StreamColumn=name[%s], type=[%s], 
defaultValue=[%s], required=[%s], nodataExpression=[%s]",
-            name, type, defaultValue, required, nodataExpression);
-    }
-
-    public String getNodataExpression() {
-        return nodataExpression;
-    }
-
-    public void setNodataExpression(String nodataExpression) {
-        this.nodataExpression = nodataExpression;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    @XmlJavaTypeAdapter(StreamColumnTypeAdapter.class)
-    public Type getType() {
-        return type;
-    }
-
-    public void setType(Type type) {
-        this.type = type;
-    }
-
-    @XmlJavaTypeAdapter(value = DefaultValueAdapter.class)
-    public Object getDefaultValue() {
-        return defaultValue;
-    }
-
-    private void ensureDefaultValueType() {
-        if (this.getDefaultValue() != null && (this.getDefaultValue() 
instanceof String) && this.getType() != Type.STRING) {
-            switch (this.getType()) {
-                case INT:
-                    this.setDefaultValue(Integer.valueOf((String) 
this.getDefaultValue()));
-                    break;
-                case LONG:
-                    this.setDefaultValue(Long.valueOf((String) 
this.getDefaultValue()));
-                    break;
-                case FLOAT:
-                    this.setDefaultValue(Float.valueOf((String) 
this.getDefaultValue()));
-                    break;
-                case DOUBLE:
-                    this.setDefaultValue(Double.valueOf((String) 
this.getDefaultValue()));
-                    break;
-                case BOOL:
-                    this.setDefaultValue(Boolean.valueOf((String) 
this.getDefaultValue()));
-                    break;
-                case OBJECT:
-                    try {
-                        this.setDefaultValue(new 
ObjectMapper().readValue((String) this.getDefaultValue(), HashMap.class));
-                    } catch (IOException e) {
-                        throw new IllegalArgumentException(e);
-                    }
-                    break;
-                default:
-                    throw new IllegalArgumentException("Illegal type: " + 
this.getType());
-            }
-        }
-    }
-
-    public void setDefaultValue(Object defaultValue) {
-        this.defaultValue = defaultValue;
-        ensureDefaultValueType();
-    }
-
-    public boolean isRequired() {
-        return required;
-    }
-
-    public void setRequired(boolean required) {
-        this.required = required;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public enum Type implements Serializable {
-        STRING("string"), INT("int"), LONG("long"), FLOAT("float"), 
DOUBLE("double"), BOOL("bool"), OBJECT("object");
-
-        private final String name;
-
-        Type(String name) {
-            this.name = name;
-        }
-
-        @Override
-        public String toString() {
-            return name;
-        }
-
-        @com.fasterxml.jackson.annotation.JsonCreator
-        public static Type getEnumFromValue(String value) {
-            for (Type testEnum : values()) {
-                if (testEnum.name.equalsIgnoreCase(value)) {
-                    return testEnum;
-                }
-            }
-            throw new IllegalArgumentException();
-        }
-    }
-
-    public static class StreamColumnTypeAdapter extends XmlAdapter<String, 
Type> {
-
-        @Override
-        public Type unmarshal(String v) throws Exception {
-            return Type.getEnumFromValue(v);
-        }
-
-        @Override
-        public String marshal(Type v) throws Exception {
-            return v.name;
-        }
-    }
-
-    public static class DefaultValueAdapter extends XmlAdapter<String, Object> 
{
-        @Override
-        public Object unmarshal(String v) throws Exception {
-            return v;
-        }
-
-        @Override
-        public String marshal(Object v) throws Exception {
-            return v.toString();
-        }
-    }
-
-    public static class Builder {
-        private StreamColumn column;
-
-        public Builder() {
-            column = new StreamColumn();
-        }
-
-        public Builder name(String name) {
-            column.setName(name);
-            return this;
-        }
-
-        public Builder type(Type type) {
-            column.setType(type);
-            return this;
-        }
-
-        public Builder defaultValue(Object defaultValue) {
-            column.setDefaultValue(defaultValue);
-            return this;
-        }
-
-        public Builder required(boolean required) {
-            column.setRequired(required);
-            return this;
-        }
-
-        public StreamColumn build() {
-            return column;
-        }
-    }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+public class StreamColumn implements Serializable {
+
+    private static final long serialVersionUID = -5457861313624389106L;
+    private String name;
+    private Type type;
+    private Object defaultValue;
+    private boolean required;
+    private String description;
+    private String nodataExpression;
+
+    public String toString() {
+        return String.format("StreamColumn=name[%s], type=[%s], 
defaultValue=[%s], required=[%s], nodataExpression=[%s]",
+            name, type, defaultValue, required, nodataExpression);
+    }
+
+    public String getNodataExpression() {
+        return nodataExpression;
+    }
+
+    public void setNodataExpression(String nodataExpression) {
+        this.nodataExpression = nodataExpression;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @XmlJavaTypeAdapter(StreamColumnTypeAdapter.class)
+    public Type getType() {
+        return type;
+    }
+
+    public void setType(Type type) {
+        this.type = type;
+    }
+
+    @XmlJavaTypeAdapter(value = DefaultValueAdapter.class)
+    public Object getDefaultValue() {
+        return defaultValue;
+    }
+
+    private void ensureDefaultValueType() {
+        if (this.getDefaultValue() != null && (this.getDefaultValue() 
instanceof String) && this.getType() != Type.STRING) {
+            switch (this.getType()) {
+                case INT:
+                    this.setDefaultValue(Integer.valueOf((String) 
this.getDefaultValue()));
+                    break;
+                case LONG:
+                    this.setDefaultValue(Long.valueOf((String) 
this.getDefaultValue()));
+                    break;
+                case FLOAT:
+                    this.setDefaultValue(Float.valueOf((String) 
this.getDefaultValue()));
+                    break;
+                case DOUBLE:
+                    this.setDefaultValue(Double.valueOf((String) 
this.getDefaultValue()));
+                    break;
+                case BOOL:
+                    this.setDefaultValue(Boolean.valueOf((String) 
this.getDefaultValue()));
+                    break;
+                case OBJECT:
+                    try {
+                        this.setDefaultValue(new 
ObjectMapper().readValue((String) this.getDefaultValue(), HashMap.class));
+                    } catch (IOException e) {
+                        throw new IllegalArgumentException(e);
+                    }
+                    break;
+                default:
+                    throw new IllegalArgumentException("Illegal type: " + 
this.getType());
+            }
+        }
+    }
+
+    public void setDefaultValue(Object defaultValue) {
+        this.defaultValue = defaultValue;
+        ensureDefaultValueType();
+    }
+
+    public boolean isRequired() {
+        return required;
+    }
+
+    public void setRequired(boolean required) {
+        this.required = required;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public enum Type implements Serializable {
+        STRING("string"), INT("int"), LONG("long"), FLOAT("float"), 
DOUBLE("double"), BOOL("bool"), OBJECT("object");
+
+        private final String name;
+
+        Type(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        @com.fasterxml.jackson.annotation.JsonCreator
+        public static Type getEnumFromValue(String value) {
+            for (Type testEnum : values()) {
+                if (testEnum.name.equalsIgnoreCase(value)) {
+                    return testEnum;
+                }
+            }
+            throw new IllegalArgumentException();
+        }
+    }
+
+    public static class StreamColumnTypeAdapter extends XmlAdapter<String, 
Type> {
+
+        @Override
+        public Type unmarshal(String v) throws Exception {
+            return Type.getEnumFromValue(v);
+        }
+
+        @Override
+        public String marshal(Type v) throws Exception {
+            return v.name;
+        }
+    }
+
+    public static class DefaultValueAdapter extends XmlAdapter<String, Object> 
{
+        @Override
+        public Object unmarshal(String v) throws Exception {
+            return v;
+        }
+
+        @Override
+        public String marshal(Object v) throws Exception {
+            return v.toString();
+        }
+    }
+
+    public static class Builder {
+        private StreamColumn column;
+
+        public Builder() {
+            column = new StreamColumn();
+        }
+
+        public Builder name(String name) {
+            column.setName(name);
+            return this;
+        }
+
+        public Builder type(Type type) {
+            column.setType(type);
+            return this;
+        }
+
+        public Builder defaultValue(Object defaultValue) {
+            column.setDefaultValue(defaultValue);
+            return this;
+        }
+
+        public Builder required(boolean required) {
+            column.setRequired(required);
+            return this;
+        }
+
+        public StreamColumn build() {
+            return column;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
index e0789f9..1be36f3 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
@@ -1,145 +1,145 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlElementWrapper;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This is actually a data source schema.
- *
- * @since Apr 5, 2016
- */
-public class StreamDefinition implements Serializable {
-    private static final long serialVersionUID = 2352202882328931825L;
-
-    // Stream unique ID
-    private String streamId;
-
-    // Stream description
-    private String description;
-
-    // Is validateable or not
-    private boolean validate;
-
-    // Is timeseries-based stream or not
-    private boolean timeseries;
-
-    // TODO: Decouple dataSource and siteId from stream definition
-
-    // Stream data source ID
-    private String dataSource;
-
-    // Tenant (Site) ID
-    private String siteId;
-
-    private List<StreamColumn> columns = new ArrayList<>();
-
-    public String toString() {
-        return String.format("StreamDefinition[streamId=%s, dataSource=%s, 
description=%s, validate=%s, timeseries=%s, columns=%s",
-            streamId,
-            dataSource,
-            description,
-            validate,
-            timeseries,
-            columns);
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public boolean isValidate() {
-        return validate;
-    }
-
-    public void setValidate(boolean validate) {
-        this.validate = validate;
-    }
-
-    public boolean isTimeseries() {
-        return timeseries;
-    }
-
-    public void setTimeseries(boolean timeseries) {
-        this.timeseries = timeseries;
-    }
-
-    @XmlElementWrapper(name = "columns")
-    @XmlElement(name = "column")
-    public List<StreamColumn> getColumns() {
-        return columns;
-    }
-
-    public void setColumns(List<StreamColumn> columns) {
-        this.columns = columns;
-    }
-
-    public String getDataSource() {
-        return dataSource;
-    }
-
-    public void setDataSource(String dataSource) {
-        this.dataSource = dataSource;
-    }
-
-    public int getColumnIndex(String column) {
-        int i = 0;
-        for (StreamColumn col : this.getColumns()) {
-            if (col.getName().equals(column)) {
-                return i;
-            }
-            i++;
-        }
-        return -1;
-    }
-
-    public String getSiteId() {
-        return siteId;
-    }
-
-    public void setSiteId(String siteId) {
-        this.siteId = siteId;
-    }
-
-    public StreamDefinition copy() {
-        StreamDefinition copied = new StreamDefinition();
-        copied.setColumns(this.getColumns());
-        copied.setDataSource(this.getDataSource());
-        copied.setDescription(this.getDescription());
-        copied.setSiteId(this.getSiteId());
-        copied.setStreamId(this.getStreamId());
-        copied.setTimeseries(this.isTimeseries());
-        copied.setValidate(this.isValidate());
-        return copied;
-    }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementWrapper;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is actually a data source schema.
+ *
+ * @since Apr 5, 2016
+ */
+public class StreamDefinition implements Serializable {
+    private static final long serialVersionUID = 2352202882328931825L;
+
+    // Stream unique ID
+    private String streamId;
+
+    // Stream description
+    private String description;
+
+    // Is validateable or not
+    private boolean validate;
+
+    // Is timeseries-based stream or not
+    private boolean timeseries;
+
+    // TODO: Decouple dataSource and siteId from stream definition
+
+    // Stream data source ID
+    private String dataSource;
+
+    // Tenant (Site) ID
+    private String siteId;
+
+    private List<StreamColumn> columns = new ArrayList<>();
+
+    public String toString() {
+        return String.format("StreamDefinition[streamId=%s, dataSource=%s, 
description=%s, validate=%s, timeseries=%s, columns=%s",
+            streamId,
+            dataSource,
+            description,
+            validate,
+            timeseries,
+            columns);
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public boolean isValidate() {
+        return validate;
+    }
+
+    public void setValidate(boolean validate) {
+        this.validate = validate;
+    }
+
+    public boolean isTimeseries() {
+        return timeseries;
+    }
+
+    public void setTimeseries(boolean timeseries) {
+        this.timeseries = timeseries;
+    }
+
+    @XmlElementWrapper(name = "columns")
+    @XmlElement(name = "column")
+    public List<StreamColumn> getColumns() {
+        return columns;
+    }
+
+    public void setColumns(List<StreamColumn> columns) {
+        this.columns = columns;
+    }
+
+    public String getDataSource() {
+        return dataSource;
+    }
+
+    public void setDataSource(String dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    public int getColumnIndex(String column) {
+        int i = 0;
+        for (StreamColumn col : this.getColumns()) {
+            if (col.getName().equals(column)) {
+                return i;
+            }
+            i++;
+        }
+        return -1;
+    }
+
+    public String getSiteId() {
+        return siteId;
+    }
+
+    public void setSiteId(String siteId) {
+        this.siteId = siteId;
+    }
+
+    public StreamDefinition copy() {
+        StreamDefinition copied = new StreamDefinition();
+        copied.setColumns(this.getColumns());
+        copied.setDataSource(this.getDataSource());
+        copied.setDescription(this.getDescription());
+        copied.setSiteId(this.getSiteId());
+        copied.setStreamId(this.getStreamId());
+        copied.setTimeseries(this.isTimeseries());
+        copied.setValidate(this.isValidate());
+        return copied;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/pom.xml 
b/eagle-core/eagle-app/eagle-app-base/pom.xml
index a5e295e..21fdfae 100644
--- a/eagle-core/eagle-app/eagle-app-base/pom.xml
+++ b/eagle-core/eagle-app/eagle-app-base/pom.xml
@@ -114,6 +114,17 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-client-base</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>xercesImpl</artifactId>
+                    <groupId>xerces</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.powermock</groupId>
             <artifactId>powermock-module-junit4</artifactId>
             <version>${powermock.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java
index 02c130a..5032aa6 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java
@@ -16,8 +16,7 @@
  */
 package org.apache.eagle.app.environment;
 
-import org.apache.eagle.app.sink.KafkaStreamSink;
-import org.apache.eagle.app.sink.StreamSinkProvider;
+import org.apache.eagle.app.messaging.*;
 import com.typesafe.config.Config;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.slf4j.Logger;
@@ -26,25 +25,25 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractEnvironment implements Environment {
 
     private final Config config;
-    private final StreamSinkProvider sinkProvider;
-    private static final String APPLICATIONS_SINK_TYPE_PROPS_KEY = 
"application.sink.provider";
-    private static final String DEFAULT_APPLICATIONS_SINK_TYPE = 
KafkaStreamSink.Provider.class.getName();
+    private final StreamProvider streamProvider;
+    private static final String APPLICATIONS_MESSAGING_TYPE_PROPS_KEY = 
"application.stream.provider";
+    private static final String DEFAULT_APPLICATIONS_MESSAGING_TYPE = 
KafkaStreamProvider.class.getName();
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractEnvironment.class);
 
     public AbstractEnvironment(Config config) {
         this.config = config;
-        this.sinkProvider = loadStreamSinkProvider();
+        this.streamProvider = loadStreamProvider();
     }
 
-    private StreamSinkProvider loadStreamSinkProvider() {
-        String sinkProviderClassName = 
config.hasPath(APPLICATIONS_SINK_TYPE_PROPS_KEY)
-            ? config.getString(APPLICATIONS_SINK_TYPE_PROPS_KEY) : 
DEFAULT_APPLICATIONS_SINK_TYPE;
+    private StreamProvider loadStreamProvider() {
+        String sinkProviderClassName = 
config.hasPath(APPLICATIONS_MESSAGING_TYPE_PROPS_KEY)
+            ? config.getString(APPLICATIONS_MESSAGING_TYPE_PROPS_KEY) : 
DEFAULT_APPLICATIONS_MESSAGING_TYPE;
         try {
             Class<?> sinkProviderClass = Class.forName(sinkProviderClassName);
-            if (!StreamSinkProvider.class.isAssignableFrom(sinkProviderClass)) 
{
-                throw new IllegalStateException(sinkProviderClassName + "is 
not assignable from " + StreamSinkProvider.class.getCanonicalName());
+            if (!StreamProvider.class.isAssignableFrom(sinkProviderClass)) {
+                throw new IllegalStateException(sinkProviderClassName + "is 
not assignable from " + StreamProvider.class.getCanonicalName());
             }
-            StreamSinkProvider instance = (StreamSinkProvider) 
sinkProviderClass.newInstance();
+            StreamProvider instance = (StreamProvider) 
sinkProviderClass.newInstance();
             LOGGER.info("Loaded {}", instance);
             return instance;
         } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
@@ -60,12 +59,13 @@ public abstract class AbstractEnvironment implements 
Environment {
             .append(this.config()).build();
     }
 
-    public StreamSinkProvider streamSink() {
-        return sinkProvider;
+    public StreamProvider stream() {
+        return streamProvider;
     }
 
+
     @Override
     public Config config() {
         return config;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java
index 29e90d9..db87693 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java
@@ -16,7 +16,7 @@
  */
 package org.apache.eagle.app.environment;
 
-import org.apache.eagle.app.sink.StreamSinkProvider;
+import org.apache.eagle.app.messaging.StreamProvider;
 import com.typesafe.config.Config;
 
 import java.io.Serializable;
@@ -31,7 +31,7 @@ public interface Environment extends Serializable {
     /**
      * TODO Only useful for Storm/Spark Exeuctable Application instead of 
static web application.
      *
-     * @return StreamSinkProvider.
+     * @return StreamProvider.
      */
-    StreamSinkProvider streamSink();
+    StreamProvider stream();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/AggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/AggregateFunction.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/AggregateFunction.java
new file mode 100644
index 0000000..836300c
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/AggregateFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AggregateFunction implements TransformFunction {
+    private String aggFieldName;
+    private String resultFieldName;
+    private List<String> groupByFieldNames;
+    private long windowLengthMs;
+
+    public List<String> getGroupByFieldNames() {
+        return groupByFieldNames;
+    }
+
+    public void setGroupByFieldNames(List<String> groupByFieldNames) {
+        this.groupByFieldNames = groupByFieldNames;
+    }
+
+    public String getResultFieldName() {
+        return resultFieldName;
+    }
+
+    public void setResultFieldName(String resultFieldName) {
+        this.resultFieldName = resultFieldName;
+    }
+
+    public String getAggFieldName() {
+        return aggFieldName;
+    }
+
+    public void setAggFieldName(String aggFieldName) {
+        this.aggFieldName = aggFieldName;
+    }
+
+    public AggregateFunction asField(String resultFieldName) {
+        this.setResultFieldName(resultFieldName);
+        return this;
+    }
+
+    public AggregateFunction groupBy(String... groupByFieldNames) {
+        this.setGroupByFieldNames(Arrays.asList(groupByFieldNames));
+        return this;
+    }
+
+    public AggregateFunction windowBy(long windowLength, TimeUnit timeUnit) {
+        this.windowLengthMs = timeUnit.toMillis(windowLength);
+        return this;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
new file mode 100644
index 0000000..83f00db
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.messaging.MetricStreamPersist;
+import org.apache.eagle.app.messaging.StormStreamSource;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Storm Application Builder DSL.
+ */
+public class ApplicationBuilder {
+    private final StormEnvironment environment;
+    private final Config appConfig;
+    private final TopologyBuilder topologyBuilder;
+    private final AtomicInteger identifier;
+
+    public ApplicationBuilder(Config appConfig, StormEnvironment environment) {
+        this.appConfig = appConfig;
+        this.environment = environment;
+        this.identifier = new AtomicInteger(0);
+        this.topologyBuilder = new TopologyBuilder();
+    }
+
+    public class BuilderContext {
+        public StormTopology toTopology() {
+            return topologyBuilder.createTopology();
+        }
+    }
+
+    public abstract class InitializedStream extends BuilderContext {
+        private String id;
+
+        InitializedStream(String id) {
+            Preconditions.checkNotNull(id);
+            this.id = id;
+        }
+
+        String getId() {
+            return this.id;
+        }
+
+        /**
+         * Persist source data stream as metric.
+         */
+        public BuilderContext saveAsMetric(MetricDefinition metricDefinition) {
+            topologyBuilder.setBolt(generateId("MetricPersist"), new 
MetricStreamPersist(metricDefinition, appConfig)).shuffleGrouping(getId());
+            return this;
+        }
+
+        public TransformedStream transformBy(TransformFunction function) {
+            String componentId = generateId(function.getName());
+            topologyBuilder.setBolt(componentId, new 
TransformFunctionBolt(function)).shuffleGrouping(getId());
+            return new TransformedStream(componentId);
+        }
+    }
+
+    public class SourcedStream extends InitializedStream {
+        private final Config appConfig;
+        private final StormStreamSource streamSource;
+
+        private SourcedStream(SourcedStream withSourcedStream) {
+            this(withSourcedStream.getId(), withSourcedStream.appConfig, 
withSourcedStream.streamSource);
+        }
+
+        private SourcedStream(String componentId, Config appConfig, 
StormStreamSource streamSource) {
+            super(componentId);
+            this.appConfig = appConfig;
+            this.streamSource = streamSource;
+            topologyBuilder.setSpout(componentId, streamSource);
+        }
+    }
+
+    public class TransformedStream extends InitializedStream {
+        public TransformedStream(String id) {
+            super(id);
+            throw new IllegalStateException("TODO: Not implemented yet");
+        }
+    }
+
+    public TopologyBuilder getTopologyBuilder() {
+        return this.topologyBuilder;
+    }
+
+    public StormTopology createTopology() {
+        return topologyBuilder.createTopology();
+    }
+
+
+    public SourcedStream fromStream(String streamId) {
+        return new SourcedStream(generateId("SourcedStream-" + streamId), 
this.appConfig, environment.getStreamSource(streamId, this.appConfig));
+    }
+
+    public SourcedStream fromStream(SourcedStream sourcedStream) {
+        return new SourcedStream(sourcedStream);
+    }
+
+    private String generateId(String prefix) {
+        return String.format("%s_%s", prefix, 
this.identifier.getAndIncrement());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
new file mode 100644
index 0000000..dd3b214
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import java.util.Map;
+
+/**
+ * TODO: Not implemented yet.
+ */
+public class CEPFunction implements TransformFunction {
+
+    private final CEPDefinition cepDefinition;
+    private Collector collector;
+
+    public CEPFunction(CEPDefinition cepDefinition) {
+        this.cepDefinition = cepDefinition;
+    }
+
+    public CEPFunction(String siddhiQuery, String inputStreamId, String 
outputStreamId) {
+        this.cepDefinition = new CEPDefinition(siddhiQuery,inputStreamId, 
outputStreamId);
+    }
+
+    @Override
+    public String getName() {
+        return "CEPFunction";
+    }
+
+    @Override
+    public void open(Collector collector) {
+        throw new IllegalStateException("TODO: Not implemented yet");
+    }
+
+    @Override
+    public void transform(Map event) {
+        throw new IllegalStateException("TODO: Not implemented yet");
+    }
+
+    @Override
+    public void close() {
+        throw new IllegalStateException("TODO: Not implemented yet");
+    }
+
+    public static class CEPDefinition {
+        private String inputStreamId;
+        private String outputStreamId;
+        private String siddhiQuery;
+
+        public CEPDefinition(String siddhiQuery, String inputStreamId, String 
outputStreamId) {
+            this.siddhiQuery = siddhiQuery;
+            this.inputStreamId = inputStreamId;
+            this.outputStreamId = outputStreamId;
+        }
+
+        public String getSiddhiQuery() {
+            return siddhiQuery;
+        }
+
+        public void setSiddhiQuery(String siddhiQuery) {
+            this.siddhiQuery = siddhiQuery;
+        }
+
+        public String getOutputStreamId() {
+            return outputStreamId;
+        }
+
+        public void setOutputStreamId(String outputStreamId) {
+            this.outputStreamId = outputStreamId;
+        }
+
+        public String getInputStreamId() {
+            return inputStreamId;
+        }
+
+        public void setInputStreamId(String inputStreamId) {
+            this.inputStreamId = inputStreamId;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/Collector.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/Collector.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/Collector.java
new file mode 100644
index 0000000..f86d101
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/Collector.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import java.util.Map;
+
+public interface Collector {
+    void collect(Object key, Map event);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
new file mode 100644
index 0000000..04c5bf9
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import java.util.Map;
+
+public class MaxFunction extends AggregateFunction {
+    @Override
+    public String getName() {
+        return "MAX";
+    }
+
+    @Override
+    public void open(Collector collector) {
+        throw new IllegalStateException("TODO: Not implemented yet.");
+    }
+
+    @Override
+    public void transform(Map event) {
+        throw new IllegalStateException("TODO: Not implemented yet.");
+    }
+
+    @Override
+    public void close() {
+
+        throw new IllegalStateException("TODO: Not implemented yet.");
+    }
+
+    public static MaxFunction maxOf(String aggFieldName) {
+        MaxFunction function = new MaxFunction();
+        function.setAggFieldName(aggFieldName);
+        return function;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
new file mode 100644
index 0000000..62a81b0
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class MetricDefinition implements Serializable {
+
+    /**
+     * Support simple and complex name format, by default using "metric" field.
+     */
+    private NameSelector nameSelector = new FieldNameSelector("metric");
+
+    /**
+     * Support event/system time, by default using system time.
+     */
+    private TimestampSelector timestampSelector = new 
SystemTimestampSelector();
+
+    /**
+     * Metric dimension field name.
+     */
+    private List<String> dimensionFields;
+
+    /**
+     * Metric value field name.
+     */
+    private String valueField = "value";
+
+    public NameSelector getNameSelector() {
+        return nameSelector;
+    }
+
+    public void setNameSelector(NameSelector nameSelector) {
+        this.nameSelector = nameSelector;
+    }
+
+    public String getValueField() {
+        return valueField;
+    }
+
+    public void setValueField(String valueField) {
+        this.valueField = valueField;
+    }
+
+    public List<String> getDimensionFields() {
+        return dimensionFields;
+    }
+
+    public void setDimensionFields(List<String> dimensionFields) {
+        this.dimensionFields = dimensionFields;
+    }
+
+    public TimestampSelector getTimestampSelector() {
+        return timestampSelector;
+    }
+
+    public void setTimestampSelector(TimestampSelector timestampSelector) {
+        this.timestampSelector = timestampSelector;
+    }
+
+
+    @FunctionalInterface
+    public interface NameSelector extends Serializable {
+        String getMetricName(Map event);
+    }
+
+    @FunctionalInterface
+    public interface TimestampSelector extends Serializable {
+        Long getTimestamp(Map event);
+    }
+
+    public static MetricDefinition namedBy(NameSelector nameSelector) {
+        MetricDefinition metricDefinition = new MetricDefinition();
+        metricDefinition.setNameSelector(nameSelector);
+        return metricDefinition;
+    }
+
+    public static MetricDefinition namedByField(String nameField) {
+        MetricDefinition metricDefinition = new MetricDefinition();
+        metricDefinition.setNameSelector(new FieldNameSelector(nameField));
+        return metricDefinition;
+    }
+
+    public MetricDefinition eventTimeByField(String timestampField) {
+        this.setTimestampSelector(new EventTimestampSelector(timestampField));
+        return this;
+    }
+
+    public MetricDefinition dimensionFields(String... dimensionFields) {
+        this.setDimensionFields(Arrays.asList(dimensionFields));
+        return this;
+    }
+
+    public MetricDefinition valueField(String valueField) {
+        this.setValueField(valueField);
+        return this;
+    }
+
+    public class EventTimestampSelector implements TimestampSelector {
+        private final String timestampField;
+
+        EventTimestampSelector(String timestampField) {
+            this.timestampField = timestampField;
+        }
+
+        @Override
+        public Long getTimestamp(Map event) {
+            if (event.containsKey(timestampField)) {
+                Object timestampValue = event.get(timestampField);
+                if (timestampValue instanceof Integer) {
+                    return Long.valueOf((Integer) timestampValue);
+                }
+                if (timestampValue instanceof String) {
+                    return Long.valueOf((String) timestampValue);
+                } else {
+                    return (Long) timestampValue;
+                }
+            } else {
+                throw new IllegalArgumentException("Timestamp field '" + 
timestampField + "' not exists");
+            }
+        }
+    }
+
+    public static class SystemTimestampSelector implements TimestampSelector {
+        @Override
+        public Long getTimestamp(Map event) {
+            return System.currentTimeMillis();
+        }
+    }
+
+    public static class FieldNameSelector implements NameSelector {
+        private final String fieldName;
+
+        FieldNameSelector(String fieldName) {
+            this.fieldName = fieldName;
+        }
+
+        @Override
+        public String getMetricName(Map event) {
+            if (event.containsKey(fieldName)) {
+                return (String) event.get(fieldName);
+            } else {
+                throw new IllegalArgumentException("Metric name field '" + 
fieldName + "' not exists: " + event);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
new file mode 100644
index 0000000..9135cc8
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import backtype.storm.task.OutputCollector;
+
+import java.util.Arrays;
+
+import java.util.Map;
+
+public class StormOutputCollector implements Collector {
+    private final OutputCollector delegate;
+
+    StormOutputCollector(OutputCollector delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void collect(Object key, Map event) {
+        delegate.emit(Arrays.asList(key, event));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
new file mode 100644
index 0000000..11974ff
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public interface TransformFunction extends Serializable {
+    String getName();
+
+    void open(Collector collector);
+
+    void transform(Map event);
+
+    void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
new file mode 100644
index 0000000..dbc7239
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.app.utils.StreamConvertHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class TransformFunctionBolt extends BaseRichBolt {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TransformFunctionBolt.class);
+    private final TransformFunction function;
+    private OutputCollector collector;
+
+    public TransformFunctionBolt(TransformFunction function) {
+        this.function = function;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+        this.function.open(new StormOutputCollector(collector));
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        try {
+            
this.function.transform(StreamConvertHelper.tupleToEvent(input).f1());
+            this.collector.ack(input);
+        } catch (Throwable throwable) {
+            LOG.error("Transform error: {}", input, throwable);
+            this.collector.reportError(throwable);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("f1","f2"));
+    }
+
+    @Override
+    public void cleanup() {
+        this.function.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
index 18675a6..ae52cd0 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
@@ -17,8 +17,13 @@
 package org.apache.eagle.app.environment.impl;
 
 import org.apache.eagle.app.environment.AbstractEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.environment.builder.ApplicationBuilder;
+import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.environment.builder.TransformFunction;
+import org.apache.eagle.app.environment.builder.TransformFunctionBolt;
+import org.apache.eagle.app.messaging.*;
 import com.typesafe.config.Config;
+import org.apache.eagle.metadata.model.StreamSourceConfig;
 
 /**
  * Storm Execution Environment Context.
@@ -28,7 +33,30 @@ public class StormEnvironment extends AbstractEnvironment {
         super(envConfig);
     }
 
+    // ----------------------------------
+    // Classic Storm Topology Builder API
+    // ----------------------------------
     public StormStreamSink getStreamSink(String streamId, Config config) {
-        return ((StormStreamSink) streamSink().getSink(streamId,config));
+        return ((StormStreamSink) stream().getSink(streamId,config));
     }
-}
+
+    public StormStreamSource getStreamSource(String streamId, Config config) {
+        return (StormStreamSource) stream().getSource(streamId,config);
+    }
+
+    public MetricStreamPersist getMetricPersist(MetricDefinition 
metricDefinition, Config config) {
+        return new MetricStreamPersist(metricDefinition, config);
+    }
+
+    public TransformFunctionBolt getTransformer(TransformFunction function) {
+        return new TransformFunctionBolt(function);
+    }
+
+    // ----------------------------------
+    // Fluent Storm App Builder API
+    // ----------------------------------
+
+    public ApplicationBuilder newApp(Config appConfig) {
+        return new ApplicationBuilder(appConfig, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
new file mode 100644
index 0000000..d1cecc9
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+public class DefaultStreamSinkConfig implements StreamSinkConfig {
+    private final Class<?> streamPersistClass;
+    private static final String NONE_STORAGE_TYPE = "NONE";
+
+    public DefaultStreamSinkConfig(Class<?> streamPersistClass) {
+        this.streamPersistClass = streamPersistClass;
+    }
+
+    @Override
+    public String getType() {
+        return NONE_STORAGE_TYPE;
+    }
+
+    public Class<?> getSinkType() {
+        return streamPersistClass;
+    }
+
+    @Override
+    public Class<? extends StreamSinkConfig> getConfigType() {
+        return DefaultStreamSinkConfig.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
new file mode 100644
index 0000000..c8fe1b5
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import backtype.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+public class FlattenEventMapper implements StreamEventMapper {
+    private final String streamId;
+    private static final String TIMESTAMP_FIELD = "timestamp";
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FlattenEventMapper.class);
+
+    public FlattenEventMapper(String streamId) {
+        this.streamId = streamId;
+    }
+
+    @Override
+    public List<StreamEvent> map(Tuple tuple) throws Exception {
+        long timestamp;
+        if (tuple.getFields().contains(TIMESTAMP_FIELD)) {
+            try {
+                timestamp = tuple.getLongByField("timestamp");
+            } catch (Exception ex) {
+                // if timestamp is not null
+                LOGGER.error(ex.getMessage(), ex);
+                timestamp = 0;
+            }
+        } else {
+            timestamp = System.currentTimeMillis();
+        }
+        Object[] values = new Object[tuple.getFields().size()];
+        for (int i = 0; i < tuple.getFields().size(); i++) {
+            values[i] = tuple.getValue(i);
+        }
+        StreamEvent event = new StreamEvent();
+        event.setTimestamp(timestamp);
+        event.setStreamId(streamId);
+        event.setData(values);
+        return Collections.singletonList(event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
new file mode 100644
index 0000000..987ed0b
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * General Json Schema.
+ * Different from org.apache.eagle.alert.engine.scheme.JsonScheme which is 
just to multi-topic cases.
+ *
+ * @see org.apache.eagle.alert.engine.scheme.JsonScheme
+ */
+public class JsonSchema implements Scheme {
+    private static final long serialVersionUID = -8352896475656975577L;
+    private static final Logger LOG = 
org.slf4j.LoggerFactory.getLogger(JsonSchema.class);
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public Fields getOutputFields() {
+        return new Fields("f1","f2");
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public List<Object> deserialize(byte[] ser) {
+        try {
+            if (ser != null) {
+                Map map = mapper.readValue(ser, Map.class);
+                return Arrays.asList(map.hashCode(), map);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Content is null, ignore");
+                }
+            }
+        } catch (IOException e) {
+            try {
+                LOG.error("Failed to deserialize as JSON: {}", new String(ser, 
"UTF-8"), e);
+            } catch (Exception ex) {
+                LOG.error(ex.getMessage(), ex);
+            }
+        }
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
new file mode 100644
index 0000000..13080a1
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.spout.Scheme;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, 
KafkaStreamSinkConfig,KafkaStreamSource,KafkaStreamSourceConfig> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaStreamProvider.class);
+    private static final String DEFAULT_SHARED_SINK_TOPIC_CONF_KEY = 
"dataSinkConfig.topic";
+    private static final String DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY = 
"dataSourceConfig.topic";
+
+    private String getSinkTopicName(String streamId, Config config) {
+        String streamSpecificTopicConfigKey = 
String.format("dataSinkConfig.%s.topic",streamId);
+        if (config.hasPath(streamSpecificTopicConfigKey)) {
+            return config.getString(streamSpecificTopicConfigKey);
+        } else if (config.hasPath(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY)) {
+            LOG.warn("Using default shared sink topic {}: {}", 
DEFAULT_SHARED_SINK_TOPIC_CONF_KEY, 
config.getString(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY));
+            return config.getString(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY);
+        } else {
+            LOG.error("Neither stream specific topic: {} nor default shared 
topic: {} found in config", streamSpecificTopicConfigKey, 
DEFAULT_SHARED_SINK_TOPIC_CONF_KEY);
+            throw new IllegalArgumentException("Neither stream specific topic: 
"
+                + streamSpecificTopicConfigKey + " nor default shared topic: " 
+ DEFAULT_SHARED_SINK_TOPIC_CONF_KEY + " found in config");
+        }
+    }
+
+    private String getSourceTopicName(String streamId, Config config) {
+        String streamSpecificTopicConfigKey = 
String.format("dataSourceConfig.%s.topic",streamId);;
+        if (config.hasPath(streamSpecificTopicConfigKey)) {
+            return config.getString(streamSpecificTopicConfigKey);
+        } else if (config.hasPath(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY)) {
+            LOG.warn("Using default shared source topic {}: {}", 
DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY, 
config.getString(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY));
+            return config.getString(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY);
+        } else {
+            LOG.debug("Neither stream specific topic: {} nor default shared 
topic: {} found in config, try sink config instead", 
streamSpecificTopicConfigKey, DEFAULT_SHARED_SINK_TOPIC_CONF_KEY);
+            return getSinkTopicName(streamId,config);
+        }
+    }
+
+    @Override
+    public KafkaStreamSinkConfig getSinkConfig(String streamId, Config config) 
{
+        KafkaStreamSinkConfig sinkConfig = new KafkaStreamSinkConfig();
+        sinkConfig.setTopicId(getSinkTopicName(streamId,config));
+        
sinkConfig.setBrokerList(config.getString("dataSinkConfig.brokerList"));
+        
sinkConfig.setSerializerClass(config.hasPath("dataSinkConfig.serializerClass")
+            ? config.getString("dataSinkConfig.serializerClass") : 
"kafka.serializer.StringEncoder");
+        
sinkConfig.setKeySerializerClass(config.hasPath("dataSinkConfig.keySerializerClass")
+            ? config.getString("dataSinkConfig.keySerializerClass") : 
"kafka.serializer.StringEncoder");
+
+        // new added properties for async producer
+        
sinkConfig.setNumBatchMessages(config.hasPath("dataSinkConfig.numBatchMessages")
+            ? config.getString("dataSinkConfig.numBatchMessages") : "1024");
+        
sinkConfig.setProducerType(config.hasPath("dataSinkConfig.producerType")
+            ? config.getString("dataSinkConfig.producerType") : "async");
+        
sinkConfig.setMaxQueueBufferMs(config.hasPath("dataSinkConfig.maxQueueBufferMs")
+            ? config.getString("dataSinkConfig.maxQueueBufferMs") : "3000");
+        
sinkConfig.setRequestRequiredAcks(config.hasPath("dataSinkConfig.requestRequiredAcks")
+            ? config.getString("dataSinkConfig.requestRequiredAcks") : "1");
+
+        return sinkConfig;
+    }
+
+    @Override
+    public KafkaStreamSink getSink() {
+        return new KafkaStreamSink();
+    }
+
+    @Override
+    public KafkaStreamSourceConfig getSourceConfig(String streamId, Config 
config) {
+        KafkaStreamSourceConfig sourceConfig = new KafkaStreamSourceConfig();
+
+        sourceConfig.setTopicId(getSourceTopicName(streamId,config));
+        
sourceConfig.setBrokerZkQuorum(config.getString("dataSourceConfig.zkConnection"));
+
+        if (config.hasPath("dataSourceConfig.fetchSize")) {
+            
sourceConfig.setFetchSize(config.getInt("dataSourceConfig.fetchSize"));
+        }
+        if (config.hasPath("dataSourceConfig.transactionZKRoot")) {
+            
sourceConfig.setTransactionZKRoot(config.getString("dataSourceConfig.transactionZKRoot"));
+        }
+        if (config.hasPath("dataSourceConfig.consumerGroupId")) {
+            
sourceConfig.setConsumerGroupId(config.getString("dataSourceConfig.consumerGroupId"));
+        }
+        if (config.hasPath("dataSourceConfig.brokerZkPath")) {
+            
sourceConfig.setBrokerZkPath(config.getString("dataSourceConfig.brokerZkPath"));
+        }
+        if (config.hasPath("dataSourceConfig.txZkServers")) {
+            
sourceConfig.setTransactionZkServers(config.getString("dataSourceConfig.txZkServers"));
+        }
+        if (config.hasPath("dataSourceConfig.transactionStateUpdateMS")) {
+            
sourceConfig.setTransactionStateUpdateMS(config.getLong("dataSourceConfig.transactionStateUpdateMS"));
+        }
+        if (config.hasPath("dataSourceConfig.startOffsetTime")) {
+            
sourceConfig.setStartOffsetTime(config.getInt("dataSourceConfig.startOffsetTime"));
+        }
+        if (config.hasPath("dataSourceConfig.forceFromStart")) {
+            
sourceConfig.setForceFromStart(config.getBoolean("dataSourceConfig.forceFromStart"));
+        }
+        if (config.hasPath("dataSourceConfig.schemeCls")) {
+            try {
+                sourceConfig.setSchemaClass((Class<? extends Scheme>) 
Class.forName(config.getString("dataSourceConfig.schemeCls")));
+            } catch (ClassNotFoundException e) {
+                LOG.error("Class not found error, dataSourceConfig.schemeCls = 
{}",config.getString("dataSourceConfig.schemeCls"),e);
+            }
+        }
+        return sourceConfig;
+    }
+
+    @Override
+    public KafkaStreamSource getSource() {
+        return new KafkaStreamSource();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java
new file mode 100644
index 0000000..696d79f
--- /dev/null
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaStreamSink.class);
+    private String topicId;
+    private Producer producer;
+    private KafkaStreamSinkConfig config;
+
+    @Override
+    public void init(String streamId, KafkaStreamSinkConfig config) {
+        super.init(streamId, config);
+        this.topicId = config.getTopicId();
+        this.config = config;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+        super.prepare(stormConf, context, collector);
+        Properties properties = new Properties();
+        properties.put("metadata.broker.list", config.getBrokerList());
+        properties.put("serializer.class", config.getSerializerClass());
+        properties.put("key.serializer.class", config.getKeySerializerClass());
+        // new added properties for async producer
+        properties.put("producer.type", config.getProducerType());
+        properties.put("batch.num.messages", config.getNumBatchMessages());
+        properties.put("request.required.acks", 
config.getRequestRequiredAcks());
+        properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs());
+        ProducerConfig producerConfig = new ProducerConfig(properties);
+        producer = new Producer(producerConfig);
+    }
+
+    @Override
+    protected void execute(Object key, Map event, OutputCollector collector) 
throws Exception {
+        try {
+            String output = new ObjectMapper().writeValueAsString(event);
+            // partition key may cause data skew
+            //producer.send(new KeyedMessage(this.topicId, key, output));
+            producer.send(new KeyedMessage(this.topicId, output));
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
+            throw ex;
+        }
+    }
+
+    @Override
+    public void afterInstall() {
+        ensureTopicCreated();
+    }
+
+    private void ensureTopicCreated() {
+        LOG.info("TODO: ensure kafka topic {} created", this.topicId);
+    }
+
+    private void ensureTopicDeleted() {
+        LOG.info("TODO: ensure kafka topic {} deleted", this.topicId);
+    }
+
+    @Override
+    public void cleanup() {
+        if (this.producer != null) {
+            this.producer.close();
+        }
+    }
+
+    @Override
+    public void afterUninstall() {
+        ensureTopicDeleted();
+    }
+}
\ No newline at end of file

Reply via email to