[EAGLE-806] Integrate Metric Process and Persistence with Application Framework
Integrate Stream Source and Metric Persistence with Application Framework - Provide API to easily ingest, process, aggregate and persist metric - Integrate stream source - Integrate metric definition and metric persistence. - Implement basic plug-able aggregation abstraction for later usage. - Provide `CEPFunction` as default `TransformFunction` based on `SiddhiCEP` Author: Hao Chen <h...@apache.org> Closes #692 from haoch/streamSourceAndPersist. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/10572c29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/10572c29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/10572c29 Branch: refs/heads/master Commit: 10572c296f58532897cdea4f80132d1420890679 Parents: ff22537 Author: Hao Chen <h...@apache.org> Authored: Wed Nov 30 09:38:11 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Wed Nov 30 09:38:11 2016 +0800 ---------------------------------------------------------------------- .../alert/engine/coordinator/StreamColumn.java | 408 ++++++------- .../engine/coordinator/StreamDefinition.java | 288 ++++----- eagle-core/eagle-app/eagle-app-base/pom.xml | 11 + .../app/environment/AbstractEnvironment.java | 30 +- .../eagle/app/environment/Environment.java | 6 +- .../environment/builder/AggregateFunction.java | 67 +++ .../environment/builder/ApplicationBuilder.java | 122 ++++ .../app/environment/builder/CEPFunction.java | 92 +++ .../app/environment/builder/Collector.java | 23 + .../app/environment/builder/MaxFunction.java | 48 ++ .../environment/builder/MetricDefinition.java | 165 ++++++ .../builder/StormOutputCollector.java | 36 ++ .../environment/builder/TransformFunction.java | 30 + .../builder/TransformFunctionBolt.java | 66 +++ .../app/environment/impl/StormEnvironment.java | 34 +- .../app/messaging/DefaultStreamSinkConfig.java | 42 ++ .../eagle/app/messaging/FlattenEventMapper.java | 60 ++ .../apache/eagle/app/messaging/JsonSchema.java | 66 +++ .../app/messaging/KafkaStreamProvider.java | 130 +++++ .../eagle/app/messaging/KafkaStreamSink.java | 97 ++++ .../app/messaging/KafkaStreamSinkConfig.java | 115 ++++ .../eagle/app/messaging/KafkaStreamSource.java | 162 ++++++ .../app/messaging/KafkaStreamSourceConfig.java | 153 +++++ .../app/messaging/MetricStreamPersist.java | 152 +++++ .../eagle/app/messaging/StormStreamSink.java | 71 +++ .../eagle/app/messaging/StormStreamSource.java | 25 + .../eagle/app/messaging/StreamEventMapper.java | 35 ++ .../eagle/app/messaging/StreamProvider.java | 50 ++ .../apache/eagle/app/messaging/StreamSink.java | 24 + .../eagle/app/messaging/StreamSource.java | 23 + .../eagle/app/service/ApplicationAction.java | 24 +- .../impl/ApplicationProviderSPILoader.java | 2 +- .../eagle/app/sink/DefaultStreamSinkConfig.java | 42 -- .../eagle/app/sink/FlattenEventMapper.java | 60 -- .../apache/eagle/app/sink/KafkaStreamSink.java | 144 ----- .../eagle/app/sink/KafkaStreamSinkConfig.java | 109 ---- .../eagle/app/sink/LoggingStreamSink.java | 55 -- .../apache/eagle/app/sink/StormStreamSink.java | 93 --- .../eagle/app/sink/StreamEventMapper.java | 35 -- .../org/apache/eagle/app/sink/StreamSink.java | 24 - .../eagle/app/sink/StreamSinkProvider.java | 42 -- .../eagle/app/spi/ApplicationProvider.java | 2 +- .../eagle/app/utils/StreamConvertHelper.java | 51 ++ .../apache/eagle/app/TestStormApplication.java | 5 +- .../app/environment/StaticEnvironmentTest.java | 2 +- .../eagle/app/storm/MockStormApplication.java | 2 +- .../eagle/app/stream/CEPFunctionTest.java | 50 ++ .../src/test/resources/application.conf | 5 +- .../org/apache/eagle/common/utils/Tuple2.java | 42 ++ .../apache/eagle/metadata/model/StreamDesc.java | 21 +- .../metadata/model/StreamSourceConfig.java | 27 + .../src/test/resources/application-test.xml | 17 + .../jdbc/provider/JDBCDataSourceProvider.java | 2 +- .../ApplicationEntityServiceJDBCImpl.java | 4 +- .../service/client/EagleServiceConnector.java | 3 +- .../client/impl/EagleServiceClientImpl.java | 15 +- .../app/example/ExampleStormApplication.java | 2 +- .../src/test/resources/application.conf | 2 +- .../org/apache/eagle/gc/GCLogApplication.java | 2 +- eagle-hadoop-metric/pom.xml | 16 +- .../eagle/metric/HadoopMetricMonitorApp.java | 38 ++ .../metric/HadoopMetricMonitorAppProdiver.java | 12 +- ...le.metric.HadoopMetricMonitorAppProdiver.xml | 4 +- .../HadoopMetricMonitorAppProdiverTest.java | 84 --- .../metric/HadoopMetricMonitorAppDebug.java | 23 + .../HadoopMetricMonitorAppProviderTest.java | 84 +++ .../eagle/metric/SendSampleDataToKafka.java | 55 ++ .../src/test/resources/application.conf | 49 ++ .../resources/hadoop_jmx_metric_sample.json | 8 + ...adoopQueueRunningApplicationHealthCheck.java | 204 +++---- .../jpm/aggregation/storm/AggregationSpout.java | 2 +- .../jpm/mr/history/MRHistoryJobApplication.java | 9 +- .../jpm/mr/running/MRRunningJobApplication.java | 4 +- .../src/test/resources/mrconf_30784.xml | 2 +- .../SparkHistoryJobApplicationHealthCheck.java | 198 +++---- .../jpm/spark/running/SparkRunningJobApp.java | 4 +- .../hbase/HBaseAuditLogApplication.java | 2 +- .../AbstractHdfsAuditLogApplication.java | 4 +- .../hive/HiveQueryMonitoringApplication.java | 2 +- .../oozie/parse/OozieAuditLogApplication.java | 2 +- eagle-server-assembly/src/main/conf/eagle.conf | 4 +- .../src/main/resources/application.conf | 4 +- .../apache/eagle/topology/TopologyCheckApp.java | 2 +- .../TopologyCheckApplicationHealthCheck.java | 218 +++---- .../hbase/HbaseTopologyEntityParser.java | 330 +++++------ .../hdfs/HdfsTopologyEntityParser.java | 579 +++++++++---------- .../extractor/mr/MRTopologyEntityParser.java | 438 +++++++------- .../topology/storm/TopologyDataPersistBolt.java | 400 ++++++------- .../entity/HdfsServiceTopologyAPIEntity.java | 246 ++++---- 89 files changed, 4118 insertions(+), 2424 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java index 4628043..9705efc 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java @@ -1,205 +1,205 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.coordinator; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; -import java.io.Serializable; -import java.util.HashMap; -import javax.xml.bind.annotation.adapters.XmlAdapter; -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; - -public class StreamColumn implements Serializable { - - private static final long serialVersionUID = -5457861313624389106L; - private String name; - private Type type; - private Object defaultValue; - private boolean required; - private String description; - private String nodataExpression; - - public String toString() { - return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]", - name, type, defaultValue, required, nodataExpression); - } - - public String getNodataExpression() { - return nodataExpression; - } - - public void setNodataExpression(String nodataExpression) { - this.nodataExpression = nodataExpression; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - @XmlJavaTypeAdapter(StreamColumnTypeAdapter.class) - public Type getType() { - return type; - } - - public void setType(Type type) { - this.type = type; - } - - @XmlJavaTypeAdapter(value = DefaultValueAdapter.class) - public Object getDefaultValue() { - return defaultValue; - } - - private void ensureDefaultValueType() { - if (this.getDefaultValue() != null && (this.getDefaultValue() instanceof String) && this.getType() != Type.STRING) { - switch (this.getType()) { - case INT: - this.setDefaultValue(Integer.valueOf((String) this.getDefaultValue())); - break; - case LONG: - this.setDefaultValue(Long.valueOf((String) this.getDefaultValue())); - break; - case FLOAT: - this.setDefaultValue(Float.valueOf((String) this.getDefaultValue())); - break; - case DOUBLE: - this.setDefaultValue(Double.valueOf((String) this.getDefaultValue())); - break; - case BOOL: - this.setDefaultValue(Boolean.valueOf((String) this.getDefaultValue())); - break; - case OBJECT: - try { - this.setDefaultValue(new ObjectMapper().readValue((String) this.getDefaultValue(), HashMap.class)); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - break; - default: - throw new IllegalArgumentException("Illegal type: " + this.getType()); - } - } - } - - public void setDefaultValue(Object defaultValue) { - this.defaultValue = defaultValue; - ensureDefaultValueType(); - } - - public boolean isRequired() { - return required; - } - - public void setRequired(boolean required) { - this.required = required; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public enum Type implements Serializable { - STRING("string"), INT("int"), LONG("long"), FLOAT("float"), DOUBLE("double"), BOOL("bool"), OBJECT("object"); - - private final String name; - - Type(String name) { - this.name = name; - } - - @Override - public String toString() { - return name; - } - - @com.fasterxml.jackson.annotation.JsonCreator - public static Type getEnumFromValue(String value) { - for (Type testEnum : values()) { - if (testEnum.name.equalsIgnoreCase(value)) { - return testEnum; - } - } - throw new IllegalArgumentException(); - } - } - - public static class StreamColumnTypeAdapter extends XmlAdapter<String, Type> { - - @Override - public Type unmarshal(String v) throws Exception { - return Type.getEnumFromValue(v); - } - - @Override - public String marshal(Type v) throws Exception { - return v.name; - } - } - - public static class DefaultValueAdapter extends XmlAdapter<String, Object> { - @Override - public Object unmarshal(String v) throws Exception { - return v; - } - - @Override - public String marshal(Object v) throws Exception { - return v.toString(); - } - } - - public static class Builder { - private StreamColumn column; - - public Builder() { - column = new StreamColumn(); - } - - public Builder name(String name) { - column.setName(name); - return this; - } - - public Builder type(Type type) { - column.setType(type); - return this; - } - - public Builder defaultValue(Object defaultValue) { - column.setDefaultValue(defaultValue); - return this; - } - - public Builder required(boolean required) { - column.setRequired(required); - return this; - } - - public StreamColumn build() { - return column; - } - } +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.coordinator; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import javax.xml.bind.annotation.adapters.XmlAdapter; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +public class StreamColumn implements Serializable { + + private static final long serialVersionUID = -5457861313624389106L; + private String name; + private Type type; + private Object defaultValue; + private boolean required; + private String description; + private String nodataExpression; + + public String toString() { + return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]", + name, type, defaultValue, required, nodataExpression); + } + + public String getNodataExpression() { + return nodataExpression; + } + + public void setNodataExpression(String nodataExpression) { + this.nodataExpression = nodataExpression; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @XmlJavaTypeAdapter(StreamColumnTypeAdapter.class) + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + @XmlJavaTypeAdapter(value = DefaultValueAdapter.class) + public Object getDefaultValue() { + return defaultValue; + } + + private void ensureDefaultValueType() { + if (this.getDefaultValue() != null && (this.getDefaultValue() instanceof String) && this.getType() != Type.STRING) { + switch (this.getType()) { + case INT: + this.setDefaultValue(Integer.valueOf((String) this.getDefaultValue())); + break; + case LONG: + this.setDefaultValue(Long.valueOf((String) this.getDefaultValue())); + break; + case FLOAT: + this.setDefaultValue(Float.valueOf((String) this.getDefaultValue())); + break; + case DOUBLE: + this.setDefaultValue(Double.valueOf((String) this.getDefaultValue())); + break; + case BOOL: + this.setDefaultValue(Boolean.valueOf((String) this.getDefaultValue())); + break; + case OBJECT: + try { + this.setDefaultValue(new ObjectMapper().readValue((String) this.getDefaultValue(), HashMap.class)); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + break; + default: + throw new IllegalArgumentException("Illegal type: " + this.getType()); + } + } + } + + public void setDefaultValue(Object defaultValue) { + this.defaultValue = defaultValue; + ensureDefaultValueType(); + } + + public boolean isRequired() { + return required; + } + + public void setRequired(boolean required) { + this.required = required; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public enum Type implements Serializable { + STRING("string"), INT("int"), LONG("long"), FLOAT("float"), DOUBLE("double"), BOOL("bool"), OBJECT("object"); + + private final String name; + + Type(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + + @com.fasterxml.jackson.annotation.JsonCreator + public static Type getEnumFromValue(String value) { + for (Type testEnum : values()) { + if (testEnum.name.equalsIgnoreCase(value)) { + return testEnum; + } + } + throw new IllegalArgumentException(); + } + } + + public static class StreamColumnTypeAdapter extends XmlAdapter<String, Type> { + + @Override + public Type unmarshal(String v) throws Exception { + return Type.getEnumFromValue(v); + } + + @Override + public String marshal(Type v) throws Exception { + return v.name; + } + } + + public static class DefaultValueAdapter extends XmlAdapter<String, Object> { + @Override + public Object unmarshal(String v) throws Exception { + return v; + } + + @Override + public String marshal(Object v) throws Exception { + return v.toString(); + } + } + + public static class Builder { + private StreamColumn column; + + public Builder() { + column = new StreamColumn(); + } + + public Builder name(String name) { + column.setName(name); + return this; + } + + public Builder type(Type type) { + column.setType(type); + return this; + } + + public Builder defaultValue(Object defaultValue) { + column.setDefaultValue(defaultValue); + return this; + } + + public Builder required(boolean required) { + column.setRequired(required); + return this; + } + + public StreamColumn build() { + return column; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java index e0789f9..1be36f3 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java @@ -1,145 +1,145 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.coordinator; - -import javax.xml.bind.annotation.XmlElement; -import javax.xml.bind.annotation.XmlElementWrapper; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -/** - * This is actually a data source schema. - * - * @since Apr 5, 2016 - */ -public class StreamDefinition implements Serializable { - private static final long serialVersionUID = 2352202882328931825L; - - // Stream unique ID - private String streamId; - - // Stream description - private String description; - - // Is validateable or not - private boolean validate; - - // Is timeseries-based stream or not - private boolean timeseries; - - // TODO: Decouple dataSource and siteId from stream definition - - // Stream data source ID - private String dataSource; - - // Tenant (Site) ID - private String siteId; - - private List<StreamColumn> columns = new ArrayList<>(); - - public String toString() { - return String.format("StreamDefinition[streamId=%s, dataSource=%s, description=%s, validate=%s, timeseries=%s, columns=%s", - streamId, - dataSource, - description, - validate, - timeseries, - columns); - } - - public String getStreamId() { - return streamId; - } - - public void setStreamId(String streamId) { - this.streamId = streamId; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public boolean isValidate() { - return validate; - } - - public void setValidate(boolean validate) { - this.validate = validate; - } - - public boolean isTimeseries() { - return timeseries; - } - - public void setTimeseries(boolean timeseries) { - this.timeseries = timeseries; - } - - @XmlElementWrapper(name = "columns") - @XmlElement(name = "column") - public List<StreamColumn> getColumns() { - return columns; - } - - public void setColumns(List<StreamColumn> columns) { - this.columns = columns; - } - - public String getDataSource() { - return dataSource; - } - - public void setDataSource(String dataSource) { - this.dataSource = dataSource; - } - - public int getColumnIndex(String column) { - int i = 0; - for (StreamColumn col : this.getColumns()) { - if (col.getName().equals(column)) { - return i; - } - i++; - } - return -1; - } - - public String getSiteId() { - return siteId; - } - - public void setSiteId(String siteId) { - this.siteId = siteId; - } - - public StreamDefinition copy() { - StreamDefinition copied = new StreamDefinition(); - copied.setColumns(this.getColumns()); - copied.setDataSource(this.getDataSource()); - copied.setDescription(this.getDescription()); - copied.setSiteId(this.getSiteId()); - copied.setStreamId(this.getStreamId()); - copied.setTimeseries(this.isTimeseries()); - copied.setValidate(this.isValidate()); - return copied; - } +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.coordinator; + +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * This is actually a data source schema. + * + * @since Apr 5, 2016 + */ +public class StreamDefinition implements Serializable { + private static final long serialVersionUID = 2352202882328931825L; + + // Stream unique ID + private String streamId; + + // Stream description + private String description; + + // Is validateable or not + private boolean validate; + + // Is timeseries-based stream or not + private boolean timeseries; + + // TODO: Decouple dataSource and siteId from stream definition + + // Stream data source ID + private String dataSource; + + // Tenant (Site) ID + private String siteId; + + private List<StreamColumn> columns = new ArrayList<>(); + + public String toString() { + return String.format("StreamDefinition[streamId=%s, dataSource=%s, description=%s, validate=%s, timeseries=%s, columns=%s", + streamId, + dataSource, + description, + validate, + timeseries, + columns); + } + + public String getStreamId() { + return streamId; + } + + public void setStreamId(String streamId) { + this.streamId = streamId; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public boolean isValidate() { + return validate; + } + + public void setValidate(boolean validate) { + this.validate = validate; + } + + public boolean isTimeseries() { + return timeseries; + } + + public void setTimeseries(boolean timeseries) { + this.timeseries = timeseries; + } + + @XmlElementWrapper(name = "columns") + @XmlElement(name = "column") + public List<StreamColumn> getColumns() { + return columns; + } + + public void setColumns(List<StreamColumn> columns) { + this.columns = columns; + } + + public String getDataSource() { + return dataSource; + } + + public void setDataSource(String dataSource) { + this.dataSource = dataSource; + } + + public int getColumnIndex(String column) { + int i = 0; + for (StreamColumn col : this.getColumns()) { + if (col.getName().equals(column)) { + return i; + } + i++; + } + return -1; + } + + public String getSiteId() { + return siteId; + } + + public void setSiteId(String siteId) { + this.siteId = siteId; + } + + public StreamDefinition copy() { + StreamDefinition copied = new StreamDefinition(); + copied.setColumns(this.getColumns()); + copied.setDataSource(this.getDataSource()); + copied.setDescription(this.getDescription()); + copied.setSiteId(this.getSiteId()); + copied.setStreamId(this.getStreamId()); + copied.setTimeseries(this.isTimeseries()); + copied.setValidate(this.isValidate()); + return copied; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/pom.xml b/eagle-core/eagle-app/eagle-app-base/pom.xml index a5e295e..21fdfae 100644 --- a/eagle-core/eagle-app/eagle-app-base/pom.xml +++ b/eagle-core/eagle-app/eagle-app-base/pom.xml @@ -114,6 +114,17 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-client-base</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <artifactId>xercesImpl</artifactId> + <groupId>xerces</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.powermock</groupId> <artifactId>powermock-module-junit4</artifactId> <version>${powermock.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java index 02c130a..5032aa6 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java @@ -16,8 +16,7 @@ */ package org.apache.eagle.app.environment; -import org.apache.eagle.app.sink.KafkaStreamSink; -import org.apache.eagle.app.sink.StreamSinkProvider; +import org.apache.eagle.app.messaging.*; import com.typesafe.config.Config; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.slf4j.Logger; @@ -26,25 +25,25 @@ import org.slf4j.LoggerFactory; public abstract class AbstractEnvironment implements Environment { private final Config config; - private final StreamSinkProvider sinkProvider; - private static final String APPLICATIONS_SINK_TYPE_PROPS_KEY = "application.sink.provider"; - private static final String DEFAULT_APPLICATIONS_SINK_TYPE = KafkaStreamSink.Provider.class.getName(); + private final StreamProvider streamProvider; + private static final String APPLICATIONS_MESSAGING_TYPE_PROPS_KEY = "application.stream.provider"; + private static final String DEFAULT_APPLICATIONS_MESSAGING_TYPE = KafkaStreamProvider.class.getName(); private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEnvironment.class); public AbstractEnvironment(Config config) { this.config = config; - this.sinkProvider = loadStreamSinkProvider(); + this.streamProvider = loadStreamProvider(); } - private StreamSinkProvider loadStreamSinkProvider() { - String sinkProviderClassName = config.hasPath(APPLICATIONS_SINK_TYPE_PROPS_KEY) - ? config.getString(APPLICATIONS_SINK_TYPE_PROPS_KEY) : DEFAULT_APPLICATIONS_SINK_TYPE; + private StreamProvider loadStreamProvider() { + String sinkProviderClassName = config.hasPath(APPLICATIONS_MESSAGING_TYPE_PROPS_KEY) + ? config.getString(APPLICATIONS_MESSAGING_TYPE_PROPS_KEY) : DEFAULT_APPLICATIONS_MESSAGING_TYPE; try { Class<?> sinkProviderClass = Class.forName(sinkProviderClassName); - if (!StreamSinkProvider.class.isAssignableFrom(sinkProviderClass)) { - throw new IllegalStateException(sinkProviderClassName + "is not assignable from " + StreamSinkProvider.class.getCanonicalName()); + if (!StreamProvider.class.isAssignableFrom(sinkProviderClass)) { + throw new IllegalStateException(sinkProviderClassName + "is not assignable from " + StreamProvider.class.getCanonicalName()); } - StreamSinkProvider instance = (StreamSinkProvider) sinkProviderClass.newInstance(); + StreamProvider instance = (StreamProvider) sinkProviderClass.newInstance(); LOGGER.info("Loaded {}", instance); return instance; } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { @@ -60,12 +59,13 @@ public abstract class AbstractEnvironment implements Environment { .append(this.config()).build(); } - public StreamSinkProvider streamSink() { - return sinkProvider; + public StreamProvider stream() { + return streamProvider; } + @Override public Config config() { return config; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java index 29e90d9..db87693 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java @@ -16,7 +16,7 @@ */ package org.apache.eagle.app.environment; -import org.apache.eagle.app.sink.StreamSinkProvider; +import org.apache.eagle.app.messaging.StreamProvider; import com.typesafe.config.Config; import java.io.Serializable; @@ -31,7 +31,7 @@ public interface Environment extends Serializable { /** * TODO Only useful for Storm/Spark Exeuctable Application instead of static web application. * - * @return StreamSinkProvider. + * @return StreamProvider. */ - StreamSinkProvider streamSink(); + StreamProvider stream(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/AggregateFunction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/AggregateFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/AggregateFunction.java new file mode 100644 index 0000000..836300c --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/AggregateFunction.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.environment.builder; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public abstract class AggregateFunction implements TransformFunction { + private String aggFieldName; + private String resultFieldName; + private List<String> groupByFieldNames; + private long windowLengthMs; + + public List<String> getGroupByFieldNames() { + return groupByFieldNames; + } + + public void setGroupByFieldNames(List<String> groupByFieldNames) { + this.groupByFieldNames = groupByFieldNames; + } + + public String getResultFieldName() { + return resultFieldName; + } + + public void setResultFieldName(String resultFieldName) { + this.resultFieldName = resultFieldName; + } + + public String getAggFieldName() { + return aggFieldName; + } + + public void setAggFieldName(String aggFieldName) { + this.aggFieldName = aggFieldName; + } + + public AggregateFunction asField(String resultFieldName) { + this.setResultFieldName(resultFieldName); + return this; + } + + public AggregateFunction groupBy(String... groupByFieldNames) { + this.setGroupByFieldNames(Arrays.asList(groupByFieldNames)); + return this; + } + + public AggregateFunction windowBy(long windowLength, TimeUnit timeUnit) { + this.windowLengthMs = timeUnit.toMillis(windowLength); + return this; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java new file mode 100644 index 0000000..83f00db --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.environment.builder; + + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; +import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.messaging.MetricStreamPersist; +import org.apache.eagle.app.messaging.StormStreamSource; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Storm Application Builder DSL. + */ +public class ApplicationBuilder { + private final StormEnvironment environment; + private final Config appConfig; + private final TopologyBuilder topologyBuilder; + private final AtomicInteger identifier; + + public ApplicationBuilder(Config appConfig, StormEnvironment environment) { + this.appConfig = appConfig; + this.environment = environment; + this.identifier = new AtomicInteger(0); + this.topologyBuilder = new TopologyBuilder(); + } + + public class BuilderContext { + public StormTopology toTopology() { + return topologyBuilder.createTopology(); + } + } + + public abstract class InitializedStream extends BuilderContext { + private String id; + + InitializedStream(String id) { + Preconditions.checkNotNull(id); + this.id = id; + } + + String getId() { + return this.id; + } + + /** + * Persist source data stream as metric. + */ + public BuilderContext saveAsMetric(MetricDefinition metricDefinition) { + topologyBuilder.setBolt(generateId("MetricPersist"), new MetricStreamPersist(metricDefinition, appConfig)).shuffleGrouping(getId()); + return this; + } + + public TransformedStream transformBy(TransformFunction function) { + String componentId = generateId(function.getName()); + topologyBuilder.setBolt(componentId, new TransformFunctionBolt(function)).shuffleGrouping(getId()); + return new TransformedStream(componentId); + } + } + + public class SourcedStream extends InitializedStream { + private final Config appConfig; + private final StormStreamSource streamSource; + + private SourcedStream(SourcedStream withSourcedStream) { + this(withSourcedStream.getId(), withSourcedStream.appConfig, withSourcedStream.streamSource); + } + + private SourcedStream(String componentId, Config appConfig, StormStreamSource streamSource) { + super(componentId); + this.appConfig = appConfig; + this.streamSource = streamSource; + topologyBuilder.setSpout(componentId, streamSource); + } + } + + public class TransformedStream extends InitializedStream { + public TransformedStream(String id) { + super(id); + throw new IllegalStateException("TODO: Not implemented yet"); + } + } + + public TopologyBuilder getTopologyBuilder() { + return this.topologyBuilder; + } + + public StormTopology createTopology() { + return topologyBuilder.createTopology(); + } + + + public SourcedStream fromStream(String streamId) { + return new SourcedStream(generateId("SourcedStream-" + streamId), this.appConfig, environment.getStreamSource(streamId, this.appConfig)); + } + + public SourcedStream fromStream(SourcedStream sourcedStream) { + return new SourcedStream(sourcedStream); + } + + private String generateId(String prefix) { + return String.format("%s_%s", prefix, this.identifier.getAndIncrement()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java new file mode 100644 index 0000000..dd3b214 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.environment.builder; + +import java.util.Map; + +/** + * TODO: Not implemented yet. + */ +public class CEPFunction implements TransformFunction { + + private final CEPDefinition cepDefinition; + private Collector collector; + + public CEPFunction(CEPDefinition cepDefinition) { + this.cepDefinition = cepDefinition; + } + + public CEPFunction(String siddhiQuery, String inputStreamId, String outputStreamId) { + this.cepDefinition = new CEPDefinition(siddhiQuery,inputStreamId, outputStreamId); + } + + @Override + public String getName() { + return "CEPFunction"; + } + + @Override + public void open(Collector collector) { + throw new IllegalStateException("TODO: Not implemented yet"); + } + + @Override + public void transform(Map event) { + throw new IllegalStateException("TODO: Not implemented yet"); + } + + @Override + public void close() { + throw new IllegalStateException("TODO: Not implemented yet"); + } + + public static class CEPDefinition { + private String inputStreamId; + private String outputStreamId; + private String siddhiQuery; + + public CEPDefinition(String siddhiQuery, String inputStreamId, String outputStreamId) { + this.siddhiQuery = siddhiQuery; + this.inputStreamId = inputStreamId; + this.outputStreamId = outputStreamId; + } + + public String getSiddhiQuery() { + return siddhiQuery; + } + + public void setSiddhiQuery(String siddhiQuery) { + this.siddhiQuery = siddhiQuery; + } + + public String getOutputStreamId() { + return outputStreamId; + } + + public void setOutputStreamId(String outputStreamId) { + this.outputStreamId = outputStreamId; + } + + public String getInputStreamId() { + return inputStreamId; + } + + public void setInputStreamId(String inputStreamId) { + this.inputStreamId = inputStreamId; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/Collector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/Collector.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/Collector.java new file mode 100644 index 0000000..f86d101 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/Collector.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.environment.builder; + +import java.util.Map; + +public interface Collector { + void collect(Object key, Map event); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java new file mode 100644 index 0000000..04c5bf9 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.environment.builder; + +import java.util.Map; + +public class MaxFunction extends AggregateFunction { + @Override + public String getName() { + return "MAX"; + } + + @Override + public void open(Collector collector) { + throw new IllegalStateException("TODO: Not implemented yet."); + } + + @Override + public void transform(Map event) { + throw new IllegalStateException("TODO: Not implemented yet."); + } + + @Override + public void close() { + + throw new IllegalStateException("TODO: Not implemented yet."); + } + + public static MaxFunction maxOf(String aggFieldName) { + MaxFunction function = new MaxFunction(); + function.setAggFieldName(aggFieldName); + return function; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java new file mode 100644 index 0000000..62a81b0 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.environment.builder; + + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class MetricDefinition implements Serializable { + + /** + * Support simple and complex name format, by default using "metric" field. + */ + private NameSelector nameSelector = new FieldNameSelector("metric"); + + /** + * Support event/system time, by default using system time. + */ + private TimestampSelector timestampSelector = new SystemTimestampSelector(); + + /** + * Metric dimension field name. + */ + private List<String> dimensionFields; + + /** + * Metric value field name. + */ + private String valueField = "value"; + + public NameSelector getNameSelector() { + return nameSelector; + } + + public void setNameSelector(NameSelector nameSelector) { + this.nameSelector = nameSelector; + } + + public String getValueField() { + return valueField; + } + + public void setValueField(String valueField) { + this.valueField = valueField; + } + + public List<String> getDimensionFields() { + return dimensionFields; + } + + public void setDimensionFields(List<String> dimensionFields) { + this.dimensionFields = dimensionFields; + } + + public TimestampSelector getTimestampSelector() { + return timestampSelector; + } + + public void setTimestampSelector(TimestampSelector timestampSelector) { + this.timestampSelector = timestampSelector; + } + + + @FunctionalInterface + public interface NameSelector extends Serializable { + String getMetricName(Map event); + } + + @FunctionalInterface + public interface TimestampSelector extends Serializable { + Long getTimestamp(Map event); + } + + public static MetricDefinition namedBy(NameSelector nameSelector) { + MetricDefinition metricDefinition = new MetricDefinition(); + metricDefinition.setNameSelector(nameSelector); + return metricDefinition; + } + + public static MetricDefinition namedByField(String nameField) { + MetricDefinition metricDefinition = new MetricDefinition(); + metricDefinition.setNameSelector(new FieldNameSelector(nameField)); + return metricDefinition; + } + + public MetricDefinition eventTimeByField(String timestampField) { + this.setTimestampSelector(new EventTimestampSelector(timestampField)); + return this; + } + + public MetricDefinition dimensionFields(String... dimensionFields) { + this.setDimensionFields(Arrays.asList(dimensionFields)); + return this; + } + + public MetricDefinition valueField(String valueField) { + this.setValueField(valueField); + return this; + } + + public class EventTimestampSelector implements TimestampSelector { + private final String timestampField; + + EventTimestampSelector(String timestampField) { + this.timestampField = timestampField; + } + + @Override + public Long getTimestamp(Map event) { + if (event.containsKey(timestampField)) { + Object timestampValue = event.get(timestampField); + if (timestampValue instanceof Integer) { + return Long.valueOf((Integer) timestampValue); + } + if (timestampValue instanceof String) { + return Long.valueOf((String) timestampValue); + } else { + return (Long) timestampValue; + } + } else { + throw new IllegalArgumentException("Timestamp field '" + timestampField + "' not exists"); + } + } + } + + public static class SystemTimestampSelector implements TimestampSelector { + @Override + public Long getTimestamp(Map event) { + return System.currentTimeMillis(); + } + } + + public static class FieldNameSelector implements NameSelector { + private final String fieldName; + + FieldNameSelector(String fieldName) { + this.fieldName = fieldName; + } + + @Override + public String getMetricName(Map event) { + if (event.containsKey(fieldName)) { + return (String) event.get(fieldName); + } else { + throw new IllegalArgumentException("Metric name field '" + fieldName + "' not exists: " + event); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java new file mode 100644 index 0000000..9135cc8 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.environment.builder; + +import backtype.storm.task.OutputCollector; + +import java.util.Arrays; + +import java.util.Map; + +public class StormOutputCollector implements Collector { + private final OutputCollector delegate; + + StormOutputCollector(OutputCollector delegate) { + this.delegate = delegate; + } + + @Override + public void collect(Object key, Map event) { + delegate.emit(Arrays.asList(key, event)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java new file mode 100644 index 0000000..11974ff --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.environment.builder; + +import java.io.Serializable; +import java.util.Map; + +public interface TransformFunction extends Serializable { + String getName(); + + void open(Collector collector); + + void transform(Map event); + + void close(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java new file mode 100644 index 0000000..dbc7239 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.environment.builder; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import org.apache.eagle.app.utils.StreamConvertHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class TransformFunctionBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(TransformFunctionBolt.class); + private final TransformFunction function; + private OutputCollector collector; + + public TransformFunctionBolt(TransformFunction function) { + this.function = function; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.function.open(new StormOutputCollector(collector)); + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + try { + this.function.transform(StreamConvertHelper.tupleToEvent(input).f1()); + this.collector.ack(input); + } catch (Throwable throwable) { + LOG.error("Transform error: {}", input, throwable); + this.collector.reportError(throwable); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("f1","f2")); + } + + @Override + public void cleanup() { + this.function.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java index 18675a6..ae52cd0 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java @@ -17,8 +17,13 @@ package org.apache.eagle.app.environment.impl; import org.apache.eagle.app.environment.AbstractEnvironment; -import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.app.environment.builder.ApplicationBuilder; +import org.apache.eagle.app.environment.builder.MetricDefinition; +import org.apache.eagle.app.environment.builder.TransformFunction; +import org.apache.eagle.app.environment.builder.TransformFunctionBolt; +import org.apache.eagle.app.messaging.*; import com.typesafe.config.Config; +import org.apache.eagle.metadata.model.StreamSourceConfig; /** * Storm Execution Environment Context. @@ -28,7 +33,30 @@ public class StormEnvironment extends AbstractEnvironment { super(envConfig); } + // ---------------------------------- + // Classic Storm Topology Builder API + // ---------------------------------- public StormStreamSink getStreamSink(String streamId, Config config) { - return ((StormStreamSink) streamSink().getSink(streamId,config)); + return ((StormStreamSink) stream().getSink(streamId,config)); } -} + + public StormStreamSource getStreamSource(String streamId, Config config) { + return (StormStreamSource) stream().getSource(streamId,config); + } + + public MetricStreamPersist getMetricPersist(MetricDefinition metricDefinition, Config config) { + return new MetricStreamPersist(metricDefinition, config); + } + + public TransformFunctionBolt getTransformer(TransformFunction function) { + return new TransformFunctionBolt(function); + } + + // ---------------------------------- + // Fluent Storm App Builder API + // ---------------------------------- + + public ApplicationBuilder newApp(Config appConfig) { + return new ApplicationBuilder(appConfig, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java new file mode 100644 index 0000000..d1cecc9 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.messaging; + +import org.apache.eagle.metadata.model.StreamSinkConfig; + +public class DefaultStreamSinkConfig implements StreamSinkConfig { + private final Class<?> streamPersistClass; + private static final String NONE_STORAGE_TYPE = "NONE"; + + public DefaultStreamSinkConfig(Class<?> streamPersistClass) { + this.streamPersistClass = streamPersistClass; + } + + @Override + public String getType() { + return NONE_STORAGE_TYPE; + } + + public Class<?> getSinkType() { + return streamPersistClass; + } + + @Override + public Class<? extends StreamSinkConfig> getConfigType() { + return DefaultStreamSinkConfig.class; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java new file mode 100644 index 0000000..c8fe1b5 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.messaging; + +import org.apache.eagle.alert.engine.model.StreamEvent; +import backtype.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; + +public class FlattenEventMapper implements StreamEventMapper { + private final String streamId; + private static final String TIMESTAMP_FIELD = "timestamp"; + private static final Logger LOGGER = LoggerFactory.getLogger(FlattenEventMapper.class); + + public FlattenEventMapper(String streamId) { + this.streamId = streamId; + } + + @Override + public List<StreamEvent> map(Tuple tuple) throws Exception { + long timestamp; + if (tuple.getFields().contains(TIMESTAMP_FIELD)) { + try { + timestamp = tuple.getLongByField("timestamp"); + } catch (Exception ex) { + // if timestamp is not null + LOGGER.error(ex.getMessage(), ex); + timestamp = 0; + } + } else { + timestamp = System.currentTimeMillis(); + } + Object[] values = new Object[tuple.getFields().size()]; + for (int i = 0; i < tuple.getFields().size(); i++) { + values[i] = tuple.getValue(i); + } + StreamEvent event = new StreamEvent(); + event.setTimestamp(timestamp); + event.setStreamId(streamId); + event.setData(values); + return Collections.singletonList(event); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java new file mode 100644 index 0000000..987ed0b --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.messaging; + +import backtype.storm.spout.Scheme; +import backtype.storm.tuple.Fields; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * General Json Schema. + * Different from org.apache.eagle.alert.engine.scheme.JsonScheme which is just to multi-topic cases. + * + * @see org.apache.eagle.alert.engine.scheme.JsonScheme + */ +public class JsonSchema implements Scheme { + private static final long serialVersionUID = -8352896475656975577L; + private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(JsonSchema.class); + private static final ObjectMapper mapper = new ObjectMapper(); + + @Override + public Fields getOutputFields() { + return new Fields("f1","f2"); + } + + @Override + @SuppressWarnings("rawtypes") + public List<Object> deserialize(byte[] ser) { + try { + if (ser != null) { + Map map = mapper.readValue(ser, Map.class); + return Arrays.asList(map.hashCode(), map); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Content is null, ignore"); + } + } + } catch (IOException e) { + try { + LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e); + } catch (Exception ex) { + LOG.error(ex.getMessage(), ex); + } + } + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java new file mode 100644 index 0000000..13080a1 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.messaging; + +import backtype.storm.spout.Scheme; +import com.typesafe.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, KafkaStreamSinkConfig,KafkaStreamSource,KafkaStreamSourceConfig> { + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamProvider.class); + private static final String DEFAULT_SHARED_SINK_TOPIC_CONF_KEY = "dataSinkConfig.topic"; + private static final String DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY = "dataSourceConfig.topic"; + + private String getSinkTopicName(String streamId, Config config) { + String streamSpecificTopicConfigKey = String.format("dataSinkConfig.%s.topic",streamId); + if (config.hasPath(streamSpecificTopicConfigKey)) { + return config.getString(streamSpecificTopicConfigKey); + } else if (config.hasPath(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY)) { + LOG.warn("Using default shared sink topic {}: {}", DEFAULT_SHARED_SINK_TOPIC_CONF_KEY, config.getString(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY)); + return config.getString(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY); + } else { + LOG.error("Neither stream specific topic: {} nor default shared topic: {} found in config", streamSpecificTopicConfigKey, DEFAULT_SHARED_SINK_TOPIC_CONF_KEY); + throw new IllegalArgumentException("Neither stream specific topic: " + + streamSpecificTopicConfigKey + " nor default shared topic: " + DEFAULT_SHARED_SINK_TOPIC_CONF_KEY + " found in config"); + } + } + + private String getSourceTopicName(String streamId, Config config) { + String streamSpecificTopicConfigKey = String.format("dataSourceConfig.%s.topic",streamId);; + if (config.hasPath(streamSpecificTopicConfigKey)) { + return config.getString(streamSpecificTopicConfigKey); + } else if (config.hasPath(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY)) { + LOG.warn("Using default shared source topic {}: {}", DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY, config.getString(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY)); + return config.getString(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY); + } else { + LOG.debug("Neither stream specific topic: {} nor default shared topic: {} found in config, try sink config instead", streamSpecificTopicConfigKey, DEFAULT_SHARED_SINK_TOPIC_CONF_KEY); + return getSinkTopicName(streamId,config); + } + } + + @Override + public KafkaStreamSinkConfig getSinkConfig(String streamId, Config config) { + KafkaStreamSinkConfig sinkConfig = new KafkaStreamSinkConfig(); + sinkConfig.setTopicId(getSinkTopicName(streamId,config)); + sinkConfig.setBrokerList(config.getString("dataSinkConfig.brokerList")); + sinkConfig.setSerializerClass(config.hasPath("dataSinkConfig.serializerClass") + ? config.getString("dataSinkConfig.serializerClass") : "kafka.serializer.StringEncoder"); + sinkConfig.setKeySerializerClass(config.hasPath("dataSinkConfig.keySerializerClass") + ? config.getString("dataSinkConfig.keySerializerClass") : "kafka.serializer.StringEncoder"); + + // new added properties for async producer + sinkConfig.setNumBatchMessages(config.hasPath("dataSinkConfig.numBatchMessages") + ? config.getString("dataSinkConfig.numBatchMessages") : "1024"); + sinkConfig.setProducerType(config.hasPath("dataSinkConfig.producerType") + ? config.getString("dataSinkConfig.producerType") : "async"); + sinkConfig.setMaxQueueBufferMs(config.hasPath("dataSinkConfig.maxQueueBufferMs") + ? config.getString("dataSinkConfig.maxQueueBufferMs") : "3000"); + sinkConfig.setRequestRequiredAcks(config.hasPath("dataSinkConfig.requestRequiredAcks") + ? config.getString("dataSinkConfig.requestRequiredAcks") : "1"); + + return sinkConfig; + } + + @Override + public KafkaStreamSink getSink() { + return new KafkaStreamSink(); + } + + @Override + public KafkaStreamSourceConfig getSourceConfig(String streamId, Config config) { + KafkaStreamSourceConfig sourceConfig = new KafkaStreamSourceConfig(); + + sourceConfig.setTopicId(getSourceTopicName(streamId,config)); + sourceConfig.setBrokerZkQuorum(config.getString("dataSourceConfig.zkConnection")); + + if (config.hasPath("dataSourceConfig.fetchSize")) { + sourceConfig.setFetchSize(config.getInt("dataSourceConfig.fetchSize")); + } + if (config.hasPath("dataSourceConfig.transactionZKRoot")) { + sourceConfig.setTransactionZKRoot(config.getString("dataSourceConfig.transactionZKRoot")); + } + if (config.hasPath("dataSourceConfig.consumerGroupId")) { + sourceConfig.setConsumerGroupId(config.getString("dataSourceConfig.consumerGroupId")); + } + if (config.hasPath("dataSourceConfig.brokerZkPath")) { + sourceConfig.setBrokerZkPath(config.getString("dataSourceConfig.brokerZkPath")); + } + if (config.hasPath("dataSourceConfig.txZkServers")) { + sourceConfig.setTransactionZkServers(config.getString("dataSourceConfig.txZkServers")); + } + if (config.hasPath("dataSourceConfig.transactionStateUpdateMS")) { + sourceConfig.setTransactionStateUpdateMS(config.getLong("dataSourceConfig.transactionStateUpdateMS")); + } + if (config.hasPath("dataSourceConfig.startOffsetTime")) { + sourceConfig.setStartOffsetTime(config.getInt("dataSourceConfig.startOffsetTime")); + } + if (config.hasPath("dataSourceConfig.forceFromStart")) { + sourceConfig.setForceFromStart(config.getBoolean("dataSourceConfig.forceFromStart")); + } + if (config.hasPath("dataSourceConfig.schemeCls")) { + try { + sourceConfig.setSchemaClass((Class<? extends Scheme>) Class.forName(config.getString("dataSourceConfig.schemeCls"))); + } catch (ClassNotFoundException e) { + LOG.error("Class not found error, dataSourceConfig.schemeCls = {}",config.getString("dataSourceConfig.schemeCls"),e); + } + } + return sourceConfig; + } + + @Override + public KafkaStreamSource getSource() { + return new KafkaStreamSource(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java new file mode 100644 index 0000000..696d79f --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.messaging; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import com.fasterxml.jackson.databind.ObjectMapper; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Properties; + +public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> { + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSink.class); + private String topicId; + private Producer producer; + private KafkaStreamSinkConfig config; + + @Override + public void init(String streamId, KafkaStreamSinkConfig config) { + super.init(streamId, config); + this.topicId = config.getTopicId(); + this.config = config; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + super.prepare(stormConf, context, collector); + Properties properties = new Properties(); + properties.put("metadata.broker.list", config.getBrokerList()); + properties.put("serializer.class", config.getSerializerClass()); + properties.put("key.serializer.class", config.getKeySerializerClass()); + // new added properties for async producer + properties.put("producer.type", config.getProducerType()); + properties.put("batch.num.messages", config.getNumBatchMessages()); + properties.put("request.required.acks", config.getRequestRequiredAcks()); + properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs()); + ProducerConfig producerConfig = new ProducerConfig(properties); + producer = new Producer(producerConfig); + } + + @Override + protected void execute(Object key, Map event, OutputCollector collector) throws Exception { + try { + String output = new ObjectMapper().writeValueAsString(event); + // partition key may cause data skew + //producer.send(new KeyedMessage(this.topicId, key, output)); + producer.send(new KeyedMessage(this.topicId, output)); + } catch (Exception ex) { + LOG.error(ex.getMessage(), ex); + throw ex; + } + } + + @Override + public void afterInstall() { + ensureTopicCreated(); + } + + private void ensureTopicCreated() { + LOG.info("TODO: ensure kafka topic {} created", this.topicId); + } + + private void ensureTopicDeleted() { + LOG.info("TODO: ensure kafka topic {} deleted", this.topicId); + } + + @Override + public void cleanup() { + if (this.producer != null) { + this.producer.close(); + } + } + + @Override + public void afterUninstall() { + ensureTopicDeleted(); + } +} \ No newline at end of file