http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java index d897454..80990a5 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java @@ -35,7 +35,7 @@ public class TestTopologyMgmtService extends TopologyMgmtService { private int routerNumber; private int i = 0; private String namePrefix = "Topology"; - + // a config used to check if createTopology is enabled. FIXME: another class of mgmt service might be better private boolean enableCreateTopology = false;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf index 1ef71a0..72b8012 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf @@ -14,38 +14,38 @@ # limitations under the License. { - "coordinator" : { - "policiesPerBolt" : 5, - "boltParallelism" : 5, - "policyDefaultParallelism" : 5, - "boltLoadUpbound": 0.8, - "topologyLoadUpbound" : 0.8, - "numOfAlertBoltsPerTopology" : 5, - "zkConfig" : { - "zkQuorum" : "localhost:2181", - "zkRoot" : "/alert", - "zkSessionTimeoutMs" : 10000, - "connectionTimeoutMs" : 10000, - "zkRetryTimes" : 3, - "zkRetryInterval" : 3000 - } - "metadataService" : { - "host" : "localhost", - "port" : 8080, - "context" : "/rest" - } - "metadataDynamicCheck" : { - "initDelayMillis" : 1000, - "delayMillis" : 30000 - }, - "kafkaProducer": { - "bootstrapServers": "localhost:9092" - }, - "email": { - "sender": "ea...@eagle.com", - "recipients": "t...@eagle.com", - "mailSmtpHost": "test.eagle.com", - "mailSmtpPort": "25" - } - } + "coordinator": { + "policiesPerBolt": 5, + "boltParallelism": 5, + "policyDefaultParallelism": 5, + "boltLoadUpbound": 0.8, + "topologyLoadUpbound": 0.8, + "numOfAlertBoltsPerTopology": 5, + "zkConfig": { + "zkQuorum": "localhost:2181", + "zkRoot": "/alert", + "zkSessionTimeoutMs": 10000, + "connectionTimeoutMs": 10000, + "zkRetryTimes": 3, + "zkRetryInterval": 3000 + } + "metadataService": { + "host": "localhost", + "port": 8080, + "context": "/rest" + } + "metadataDynamicCheck": { + "initDelayMillis": 1000, + "delayMillis": 30000 + }, + "kafkaProducer": { + "bootstrapServers": "localhost:9092" + }, + "email": { + "sender": "ea...@eagle.com", + "recipients": "t...@eagle.com", + "mailSmtpHost": "test.eagle.com", + "mailSmtpPort": "25" + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/log4j.properties index d4bc126..53f091e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/log4j.properties +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/log4j.properties @@ -12,9 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - log4j.rootLogger=INFO, stdout - # standard output log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json index 8678fe6..ef697f8 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json @@ -1,19 +1,19 @@ [ -{ - "name": "network_syslog_datasource", - "type": "KAFKA", - "properties": { - }, - "topic": "logoutput", - "schemeCls": "org.apache.eagle.alert.engine.extension.SherlockEventScheme", - "codec": { - "streamNameSelectorProp": { - "fieldNamesToInferStreamName" : "namespace", - "streamNameFormat":"stream_%s" - }, - "streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector", - "timestampColumn": null, - "timestampFormat":"" - } -} + { + "name": "network_syslog_datasource", + "type": "KAFKA", + "properties": { + }, + "topic": "logoutput", + "schemeCls": "org.apache.eagle.alert.engine.extension.SherlockEventScheme", + "codec": { + "streamNameSelectorProp": { + "fieldNamesToInferStreamName": "namespace", + "streamNameFormat": "stream_%s" + }, + "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector", + "timestampColumn": null, + "timestampFormat": "" + } + } ] http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json index d306be2..9e8b0d3 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json @@ -1,26 +1,32 @@ [ -{ - "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher", - "name":"network-syslog-publish", - "policyIds": ["syslog_severity_check_stream_demons1", "syslog_severity_check_stream_umpns2"], - "dedupIntervalMin": "PT0M", - "properties":{ - "kafka_broker":"127.0.0.1:9092", - "topic":"syslog_alerts", - "value_deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer", - "value_serializer": "org.apache.kafka.common.serialization.ByteArraySerializer" + { + "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher", + "name": "network-syslog-publish", + "policyIds": [ + "syslog_severity_check_stream_demons1", + "syslog_severity_check_stream_umpns2" + ], + "dedupIntervalMin": "PT0M", + "properties": { + "kafka_broker": "127.0.0.1:9092", + "topic": "syslog_alerts", + "value_deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer", + "value_serializer": "org.apache.kafka.common.serialization.ByteArraySerializer" + }, + "serializer": "org.apache.eagle.alert.engine.extension.SherlockAlertSerializer" }, - "serializer" : "org.apache.eagle.alert.engine.extension.SherlockAlertSerializer" -}, -{ - "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher", - "name":"network-syslog-publish_rawjson", - "policyIds": ["syslog_severity_check_stream_demons1", "syslog_severity_check_stream_umpns2"], - "dedupIntervalMin": "PT0M", - "properties":{ - "kafka_broker":"127.0.0.1:9092", - "topic":"syslog_alerts_json" - }, - "serializer" : "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer" -} + { + "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher", + "name": "network-syslog-publish_rawjson", + "policyIds": [ + "syslog_severity_check_stream_demons1", + "syslog_severity_check_stream_umpns2" + ], + "dedupIntervalMin": "PT0M", + "properties": { + "kafka_broker": "127.0.0.1:9092", + "topic": "syslog_alerts_json" + }, + "serializer": "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer" + } ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json index cee6f8c..a4ecb79 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json @@ -1,138 +1,152 @@ [ -{ - "streamId": "stream_demons1", - "dataSource" : "network_syslog_datasource", - "description":"the data stream for syslog events", - "validate": false, - "timeseries":false, - "columns": [ - { - "name": "dims_facility", - "type" : "STRING", - "defaultValue": "", - "required":true - },{ - "name": "dims_severity", - "type" : "STRING", - "defaultValue": "", - "required": true - }, - { - "name": "dims_hostname", - "type" : "STRING", - "defaultValue": "", - "required": true - }, - { - "name": "dims_msgid", - "type" : "STRING", - "defaultValue": "", - "required": true - },{ - "name": "timestamp", - "type" : "STRING", - "defaultValue": "", - "required":true - },{ - "name": "state", - "type" : "STRING", - "defaultValue": "", - "required": true - },{ - "name": "interface", - "type" : "STRING", - "defaultValue": "", - "required":true - },{ - "name": "version", - "type" : "STRING", - "defaultValue": "", - "required":true - },{ - "name": "type", - "type" : "STRING", - "defaultValue": "", - "required": true - },{ - "name": "origmsg", - "type" : "STRING", - "defaultValue": "", - "required": true - },{ - "name": "name", - "type" : "STRING", - "defaultValue": "", - "required": true - }, - { - "name": "namespace", - "type" : "STRING", - "defaultValue": "", - "required": true - },{ - "name": "epochMillis", - "type" : "LONG", - "defaultValue": 0, - "required": true - } - ] -}, -{ - "streamId": "stream_umpns2", - "dataSource" : "network_syslog_datasource", - "description":"the data stream for syslog events", - "validate": false, - "timeseries":false, - "columns": [ - { - "name": "dims_facility", - "type" : "STRING", - "defaultValue": "", - "required":true - },{ - "name": "dims_severity", - "type" : "STRING", - "defaultValue": "", - "required": true - }, - { - "name": "dims_hostname", - "type" : "STRING", - "defaultValue": "", - "required": true - }, - { - "name": "dims_msgid", - "type" : "STRING", - "defaultValue": "", - "required": true - },{ - "name": "program", - "type" : "STRING", - "defaultValue": "", - "required":true - },{ - "name": "message", - "type" : "STRING", - "defaultValue": "", - "required": true - },{ - "name": "name", - "type" : "STRING", - "defaultValue": "", - "required": true - }, - { - "name": "namespace", - "type" : "STRING", - "defaultValue": "", - "required": true - },{ - "name": "epochMillis", - "type" : "LONG", - "defaultValue": 0, - "required": true - } - ] -} + { + "streamId": "stream_demons1", + "dataSource": "network_syslog_datasource", + "description": "the data stream for syslog events", + "validate": false, + "timeseries": false, + "columns": [ + { + "name": "dims_facility", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "dims_severity", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "dims_hostname", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "dims_msgid", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "timestamp", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "state", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "interface", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "version", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "type", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "origmsg", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "name", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "namespace", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "epochMillis", + "type": "LONG", + "defaultValue": 0, + "required": true + } + ] + }, + { + "streamId": "stream_umpns2", + "dataSource": "network_syslog_datasource", + "description": "the data stream for syslog events", + "validate": false, + "timeseries": false, + "columns": [ + { + "name": "dims_facility", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "dims_severity", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "dims_hostname", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "dims_msgid", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "program", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "message", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "name", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "namespace", + "type": "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "epochMillis", + "type": "LONG", + "defaultValue": 0, + "required": true + } + ] + } ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json index 7b1732f..4b53233 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json @@ -1,31 +1,31 @@ [ -{ - "name": "alertUnitTopology_1", - "numOfSpout":1, - "numOfGroupBolt": 4, - "numOfAlertBolt": 10, - "spoutId": "alertEngineSpout", - "groupNodeIds" : [ - "streamRouterBolt0", - "streamRouterBolt1", - "streamRouterBolt2", - "streamRouterBolt3" - ], - "alertBoltIds": [ - "alertBolt0", - "alertBolt1", - "alertBolt2", - "alertBolt3", - "alertBolt4", - "alertBolt5", - "alertBolt6", - "alertBolt7", - "alertBolt8", - "alertBolt9" - ], - "pubBoltId" : "alertPublishBolt", - "spoutParallelism": 1, - "groupParallelism": 1, - "alertParallelism": 1 -} + { + "name": "alertUnitTopology_1", + "numOfSpout": 1, + "numOfGroupBolt": 4, + "numOfAlertBolt": 10, + "spoutId": "alertEngineSpout", + "groupNodeIds": [ + "streamRouterBolt0", + "streamRouterBolt1", + "streamRouterBolt2", + "streamRouterBolt3" + ], + "alertBoltIds": [ + "alertBolt0", + "alertBolt1", + "alertBolt2", + "alertBolt3", + "alertBolt4", + "alertBolt5", + "alertBolt6", + "alertBolt7", + "alertBolt8", + "alertBolt9" + ], + "pubBoltId": "alertPublishBolt", + "spoutParallelism": 1, + "groupParallelism": 1, + "alertParallelism": 1 + } ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf index 63be6a9..963d2ad 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf @@ -14,38 +14,38 @@ # limitations under the License. { - "coordinator" : { - "policiesPerBolt" : 5, - "boltParallelism" : 5, - "policyDefaultParallelism" : 5, - "boltLoadUpbound": 0.8, - "topologyLoadUpbound" : 0.8, - "numOfAlertBoltsPerTopology" : 5, - "zkConfig": { - "zkQuorum" : "localhost:2181", - "zkRoot": "/alert", - "zkSessionTimeoutMs" : 10000, - "connectionTimeoutMs" : 5000, - "zkRetryTimes" : 5, - "zkRetryInterval" : 1000 - }, - "metadataService": { - "context" : "/api", - "host" : "localhost", - "port" : 8080 - }, - "metadataDynamicCheck" : { - "initDelayMillis" : 1000, - "delayMillis" : 30000 - }, - "kafkaProducer": { - "bootstrapServers": "localhost:9092" - }, - "email": { - "sender": "ea...@eagle.com", - "recipients": "t...@eagle.com", - "mailSmtpHost": "test.eagle.com", - "mailSmtpPort": "25" - } - } + "coordinator": { + "policiesPerBolt": 5, + "boltParallelism": 5, + "policyDefaultParallelism": 5, + "boltLoadUpbound": 0.8, + "topologyLoadUpbound": 0.8, + "numOfAlertBoltsPerTopology": 5, + "zkConfig": { + "zkQuorum": "localhost:2181", + "zkRoot": "/alert", + "zkSessionTimeoutMs": 10000, + "connectionTimeoutMs": 5000, + "zkRetryTimes": 5, + "zkRetryInterval": 1000 + }, + "metadataService": { + "context": "/api", + "host": "localhost", + "port": 8080 + }, + "metadataDynamicCheck": { + "initDelayMillis": 1000, + "delayMillis": 30000 + }, + "kafkaProducer": { + "bootstrapServers": "localhost:9092" + }, + "email": { + "sender": "ea...@eagle.com", + "recipients": "t...@eagle.com", + "mailSmtpHost": "test.eagle.com", + "mailSmtpPort": "25" + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties index a9e0010..e2618c2 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties @@ -13,14 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults - ############################# Server Basics ############################# - # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 - ############################# Socket Server Settings ############################# - # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: @@ -28,44 +24,31 @@ broker.id=0 # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 - # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 - # The number of threads handling network requests num.network.threads=3 - # The number of threads doing disk I/O num.io.threads=8 - # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 - # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 - # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 - - ############################# Log Basics ############################# - # A comma seperated list of directories under which to store log files log.dirs=/tmp/dev-kafka-logs - # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 - # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 - ############################# Log Flush Policy ############################# - # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: @@ -74,42 +57,31 @@ num.recovery.threads.per.data.dir=1 # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. - # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 - # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 - ############################# Log Retention Policy ############################# - # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. - # The minimum age of a log file to be eligible for deletion log.retention.hours=168 - # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 - # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 - # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 - ############################# Zookeeper ############################# - # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181 - # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java index 4fe471c..04a8cba 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java @@ -16,6 +16,7 @@ */ package org.apache.eagle.alert.tools; +import org.apache.eagle.alert.config.ZKConfig; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; @@ -23,8 +24,6 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; -import org.apache.eagle.alert.config.ZKConfig; - import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,10 +34,10 @@ public class KafkaConsumerOffsetFetcher { public ObjectMapper mapper; public String zkPathToPartition; - public KafkaConsumerOffsetFetcher(ZKConfig config, String ... parameters) { + public KafkaConsumerOffsetFetcher(ZKConfig config, String... parameters) { try { this.curator = CuratorFrameworkFactory.newClient(config.zkQuorum, config.zkSessionTimeoutMs, 15000, - new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)); + new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)); curator.start(); this.zkRoot = config.zkRoot; mapper = new ObjectMapper(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java index b78e233..e547a25 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java @@ -17,12 +17,16 @@ package org.apache.eagle.alert.tools; -import java.util.*; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; -import kafka.javaapi.*; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.PartitionMetadata; +import kafka.javaapi.TopicMetadata; +import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; +import java.util.*; + public class KafkaLatestOffsetFetcher { private List<String> brokerList; @@ -50,7 +54,9 @@ public class KafkaLatestOffsetFetcher { String clientName = "Client_" + topic + "_" + partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); long latestOffset = getLatestOffset(consumer, topic, partition, clientName); - if (consumer != null) consumer.close(); + if (consumer != null) { + consumer.close(); + } ret.put(partition, latestOffset); } return ret; @@ -63,7 +69,7 @@ public class KafkaLatestOffsetFetcher { kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { - throw new RuntimeException("Error fetching data offset from the broker. Reason: " + response.errorCode(topic, partition) ); + throw new RuntimeException("Error fetching data offset from the broker. Reason: " + response.errorCode(topic, partition)); } long[] offsets = response.offsets(topic, partition); return offsets[0]; @@ -85,11 +91,15 @@ public class KafkaLatestOffsetFetcher { partitionMetadata.put(part.partitionId(), part); } } - if (partitionMetadata.size() == partitionCount) break; + if (partitionMetadata.size() == partitionCount) { + break; + } } catch (Exception e) { throw new RuntimeException("Error communicating with Broker [" + broker + "] " + "to find Leader for [" + topic + "] Reason: ", e); } finally { - if (consumer != null) consumer.close(); + if (consumer != null) { + consumer.close(); + } } } return partitionMetadata; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala index 830e9ac..b89737a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala @@ -176,9 +176,9 @@ object ProducerTool { val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]] - if (messageData.size()>0) { + if (messageData.size() > 0) { reader.init(new ByteArrayInputStream(messageData.get(0).getBytes(StandardCharsets.UTF_8)), cmdLineProps) - } else if (messageFile.size()>0) { + } else if (messageFile.size() > 0) { reader.init(FileUtils.openInputStream(new File(messageFile.get(0))), cmdLineProps) } else { reader.init(System.in, cmdLineProps) http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java index e4d9f8f..89b2921 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java @@ -43,17 +43,17 @@ public class TestKafkaOffset { String topic = "testTopic1"; String topology = "alertUnitTopology_1"; - while(true) { + while (true) { KafkaConsumerOffsetFetcher consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(zkConfig, topic, topology); String kafkaBrokerList = config.getString("dataSourceConfig.kafkaBrokerList"); KafkaLatestOffsetFetcher latestOffsetFetcher = new KafkaLatestOffsetFetcher(kafkaBrokerList); Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch(); - if(consumedOffset.size() == 0){ + if (consumedOffset.size() == 0) { System.out.println("no any consumer offset found for this topic " + topic); } Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(topic, consumedOffset.size()); - if(latestOffset.size() == 0){ + if (latestOffset.size() == 0) { System.out.println("no any latest offset found for this topic " + topic); } for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) { @@ -61,7 +61,7 @@ public class TestKafkaOffset { Integer partitionNumber = Integer.valueOf(partition.split("_")[1]); Long lag = latestOffset.get(partitionNumber) - entry.getValue(); System.out.println(String.format("parition %s, total: %d, consumed: %d, lag: %d", - partition, latestOffset.get(partitionNumber), entry.getValue(), lag)); + partition, latestOffset.get(partitionNumber), entry.getValue(), lag)); } Thread.sleep(10000); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf index 7819178..1da6fb1 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -"dataSourceConfig":{ - "kafkaBrokerList" : "kafka-broker:9092", - "zkQuorum" : "zk-admin:2181", - "transactionZKRoot" : "/consumers/eagle_consumer/%s/%s", - "zkSessionTimeoutMs" : 10000, - "connectionTimeoutMs" : 10000, - "zkRetryTimes" : 3, - "zkRetryInterval" : 3000 +"dataSourceConfig": { + "kafkaBrokerList": "kafka-broker:9092", + "zkQuorum": "zk-admin:2181", + "transactionZKRoot": "/consumers/eagle_consumer/%s/%s", + "zkSessionTimeoutMs": 10000, + "connectionTimeoutMs": 10000, + "zkRetryTimes": 3, + "zkRetryInterval": 3000 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties index d59ded6..9c6875d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties @@ -12,9 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - log4j.rootLogger=INFO, stdout - # standard output log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml index d2f889b..89728fe 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml @@ -47,8 +47,8 @@ <artifactId>jersey-client</artifactId> </dependency> <!--<dependency>--> - <!--<groupId>org.codehaus.jackson</groupId>--> - <!--<artifactId>jackson-jaxrs</artifactId>--> + <!--<groupId>org.codehaus.jackson</groupId>--> + <!--<artifactId>jackson-jaxrs</artifactId>--> <!--</dependency>--> <dependency> <groupId>com.netflix.archaius</groupId> @@ -80,11 +80,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> </dependency> @@ -114,10 +109,6 @@ <artifactId>mapdb</artifactId> </dependency> <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - </dependency> - <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java index 7219271..4e3c275 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java @@ -20,8 +20,9 @@ import org.apache.eagle.alert.engine.model.AlertStreamEvent; */ public interface AlertStreamCollector extends Collector<AlertStreamEvent> { /** - * No need to be thread-safe, but should be called on in synchronous like in Storm bolt execute method + * No need to be thread-safe, but should be called on in synchronous like in Storm bolt execute method. */ void flush(); + void close(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java index 9a6489c..d1a2b56 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java @@ -19,7 +19,7 @@ package org.apache.eagle.alert.engine; @FunctionalInterface public interface Collector<T> { /** - * Must make sure thread-safe + * Must make sure thread-safe. * * @param t */ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java index 3a99e98..7310d8b 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java @@ -19,11 +19,11 @@ package org.apache.eagle.alert.engine; import org.apache.eagle.alert.engine.model.PartitionedEvent; /** - * Executed in thread-safe trace + * Executed in thread-safe trace. */ public interface PartitionedEventCollector extends Collector<PartitionedEvent> { /** - * @param event to be dropped + * @param event to be dropped. */ void drop(PartitionedEvent event); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java index 609378b..a03932f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java @@ -1,7 +1,6 @@ package org.apache.eagle.alert.engine; import backtype.storm.metric.api.MultiCountMetric; - import com.typesafe.config.Config; /** http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java index 4b2d68f..d02028a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java @@ -18,7 +18,6 @@ package org.apache.eagle.alert.engine; import backtype.storm.metric.api.MultiCountMetric; import backtype.storm.task.TopologyContext; - import com.typesafe.config.Config; public class StreamContextImpl implements StreamContext { @@ -26,7 +25,7 @@ public class StreamContextImpl implements StreamContext { private final MultiCountMetric counter; public StreamContextImpl(Config config, MultiCountMetric counter, TopologyContext context) { - this.counter=counter; + this.counter = counter; this.config = config; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java index 57529b6..497d908 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java @@ -19,28 +19,30 @@ package org.apache.eagle.alert.engine; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.DefaultParser; -import org.apache.commons.cli.Options; import org.apache.eagle.alert.config.ZKConfig; import org.apache.eagle.alert.config.ZKConfigBuilder; import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService; import org.apache.eagle.alert.engine.runner.UnitTopologyRunner; - import backtype.storm.generated.StormTopology; - import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; + /** - * Since 5/3/16. Make sure unit topology can be started either from command line + * Make sure unit topology can be started either from command line * or from remote A few parameters for starting unit topology 1. number of spout * tasks 2. number of router bolts 3. number of alert bolts 4. number of publish * bolts * - * Connections 1. spout and router bolt 2. router bolt and alert bolt 3. alert - * bolt and publish bolt + * <p>Connections 1. spout and router bolt 2. router bolt and alert bolt 3. alert + * bolt and publish bolt.</p> + * + * @since 5/3/16. + * */ public class UnitTopologyMain { @@ -48,7 +50,7 @@ public class UnitTopologyMain { // command line parse Options options = new Options(); options.addOption("c", true, - "config URL (valid file name) - defaults application.conf according to typesafe config default behavior."); + "config URL (valid file name) - defaults application.conf according to typesafe config default behavior."); CommandLineParser parser = new DefaultParser(); CommandLine cmd = parser.parse(options, args); @@ -64,7 +66,7 @@ public class UnitTopologyMain { ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId); new UnitTopologyRunner(changeNotifyService).run(topologyId, config); } - + public static void runTopology(Config config, backtype.storm.Config stormConfig) { // load config and start String topologyId = config.getString("topology.name"); @@ -77,7 +79,7 @@ public class UnitTopologyMain { ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId); return changeNotifyService; } - + public static StormTopology createTopology(Config config) { String topologyId = config.getString("topology.name"); ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java index 22fe56a..5aa754e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java @@ -16,35 +16,31 @@ */ package org.apache.eagle.alert.engine.coordinator; -import java.io.Closeable; -import java.io.Serializable; - import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener; import org.apache.eagle.alert.engine.router.AlertBoltSpecListener; import org.apache.eagle.alert.engine.router.SpoutSpecListener; import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener; - import com.typesafe.config.Config; +import java.io.Closeable; +import java.io.Serializable; + /** * IMetadataChangeNotifyService defines the following features * 1) initialization * 2) register metadata change listener * - * In distributed environment for example storm platform, + * <p>In distributed environment for example storm platform, * subclass implementing this interface should have the following lifecycle * 1. object is created in client machine * 2. object is serialized and transferred to other machine * 3. object is created through deserialization * 4. invoke init() method to do initialization * 5. invoke various registerListener to get notified of config change - * 6. invoke close() to release system resource + * 6. invoke close() to release system resource</p> */ -public interface IMetadataChangeNotifyService extends Closeable,Serializable { - /** - * - * @param config - */ +public interface IMetadataChangeNotifyService extends Closeable, Serializable { + void init(Config config, MetadataType type); void registerListener(SpoutSpecListener listener); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java index 5814ac6..8c493f5 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java @@ -1,8 +1,4 @@ -package org.apache.eagle.alert.engine.coordinator; - -import java.io.IOException; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -18,6 +14,10 @@ import java.io.IOException; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.eagle.alert.engine.coordinator; + +import java.io.IOException; + public class StreamDefinitionNotFoundException extends IOException { private static final long serialVersionUID = 6027811718016485808L; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java index 52fcc04..5da4cbd 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java @@ -16,13 +16,6 @@ */ package org.apache.eagle.alert.engine.coordinator.impl; -import java.io.Closeable; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - import org.apache.eagle.alert.coordination.model.AlertBoltSpec; import org.apache.eagle.alert.coordination.model.PublishSpec; import org.apache.eagle.alert.coordination.model.RouterSpec; @@ -34,28 +27,30 @@ import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener; import org.apache.eagle.alert.engine.router.AlertBoltSpecListener; import org.apache.eagle.alert.engine.router.SpoutSpecListener; import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * notify 3 components of metadata change Spout, StreamRouterBolt and AlertBolt + * notify 3 components of metadata change Spout, StreamRouterBolt and AlertBolt. */ -@SuppressWarnings({ "serial" }) -public abstract class AbstractMetadataChangeNotifyService implements IMetadataChangeNotifyService, Closeable, - Serializable { - private final static Logger LOG = LoggerFactory.getLogger(AbstractMetadataChangeNotifyService.class); +@SuppressWarnings( {"serial"}) +public abstract class AbstractMetadataChangeNotifyService implements IMetadataChangeNotifyService, Closeable, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(AbstractMetadataChangeNotifyService.class); private final List<StreamRouterBoltSpecListener> streamRouterBoltSpecListeners = new ArrayList<>(); private final List<SpoutSpecListener> spoutSpecListeners = new ArrayList<>(); private final List<AlertBoltSpecListener> alertBoltSpecListeners = new ArrayList<>(); private final List<AlertPublishSpecListener> alertPublishSpecListeners = new ArrayList<>(); protected MetadataType type; - /** - * @param config - */ @Override public void init(Config config, MetadataType type) { this.type = type; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java index 1b93072..9b31727 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java @@ -16,40 +16,32 @@ */ package org.apache.eagle.alert.engine.coordinator.impl; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - import org.apache.eagle.alert.config.ConfigBusConsumer; import org.apache.eagle.alert.config.ConfigChangeCallback; import org.apache.eagle.alert.config.ConfigValue; import org.apache.eagle.alert.config.ZKConfig; -import org.apache.eagle.alert.coordination.model.AlertBoltSpec; -import org.apache.eagle.alert.coordination.model.PublishSpec; -import org.apache.eagle.alert.coordination.model.RouterSpec; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.coordination.model.VersionedPolicyDefinition; -import org.apache.eagle.alert.coordination.model.VersionedStreamDefinition; +import org.apache.eagle.alert.coordination.model.*; import org.apache.eagle.alert.engine.coordinator.MetadataType; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.service.IMetadataServiceClient; import org.apache.eagle.alert.service.MetadataServiceClientImpl; +import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.typesafe.config.Config; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; /** * <b>TODO</b>: performance tuning: It is not JVM level service, so it may cause * zookeeper burden in case of too many listeners This does not support * dynamically adding topic, all topics should be available when service object * is created. - * <p> * ZK path format is as following: * <ul> * <li>/alert/topology_1/spout</li> @@ -80,22 +72,18 @@ public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyS LOG.info("init called for client"); } - /** - * @seeAlso Coordinator - * @return - */ private String getMetadataTopicSuffix() { switch (type) { - case ALERT_BOLT: - return "alert"; - case ALERT_PUBLISH_BOLT: - return "publisher"; - case SPOUT: - return "spout"; - case STREAM_ROUTER_BOLT: - return "router"; - default: - throw new RuntimeException(String.format("unexpected metadata type: %s !", type)); + case ALERT_BOLT: + return "alert"; + case ALERT_PUBLISH_BOLT: + return "publisher"; + case SPOUT: + return "spout"; + case STREAM_ROUTER_BOLT: + return "router"; + default: + throw new RuntimeException(String.format("unexpected metadata type: %s !", type)); } } @@ -107,7 +95,7 @@ public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyS @Override public void onNewConfig(ConfigValue value) { - LOG.info("Metadata changed {}",value); + LOG.info("Metadata changed {}", value); if (client == null) { LOG.error("OnNewConfig trigger, but metadata service client is null. Metadata type {}", type); @@ -126,42 +114,42 @@ public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyS } Map<String, StreamDefinition> sds = getStreams(state.getStreamSnapshots()); switch (type) { - case ALERT_BOLT: - // we might query metadata service query get metadata snapshot and StreamDefinition - AlertBoltSpec alertSpec = state.getAlertSpecs().get(topologyId); - if (alertSpec == null) { - LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); - } else { - prePopulate(alertSpec, state.getPolicySnapshots()); - notifyAlertBolt(alertSpec, sds); - } - break; - case ALERT_PUBLISH_BOLT: - PublishSpec pubSpec = state.getPublishSpecs().get(topologyId); - if (pubSpec == null) { - LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); - } else { - notifyAlertPublishBolt(pubSpec, sds); - } - break; - case SPOUT: - SpoutSpec spoutSpec = state.getSpoutSpecs().get(topologyId); - if (spoutSpec == null) { - LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); - } else { - notifySpout(spoutSpec, sds); - } - break; - case STREAM_ROUTER_BOLT: - RouterSpec gSpec = state.getGroupSpecs().get(topologyId); - if (gSpec == null) { - LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); - } else { - notifyStreamRouterBolt(gSpec, sds); - } - break; - default: - LOG.error("unexpected metadata type: {} ", type); + case ALERT_BOLT: + // we might query metadata service query get metadata snapshot and StreamDefinition + AlertBoltSpec alertSpec = state.getAlertSpecs().get(topologyId); + if (alertSpec == null) { + LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); + } else { + prePopulate(alertSpec, state.getPolicySnapshots()); + notifyAlertBolt(alertSpec, sds); + } + break; + case ALERT_PUBLISH_BOLT: + PublishSpec pubSpec = state.getPublishSpecs().get(topologyId); + if (pubSpec == null) { + LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); + } else { + notifyAlertPublishBolt(pubSpec, sds); + } + break; + case SPOUT: + SpoutSpec spoutSpec = state.getSpoutSpecs().get(topologyId); + if (spoutSpec == null) { + LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); + } else { + notifySpout(spoutSpec, sds); + } + break; + case STREAM_ROUTER_BOLT: + RouterSpec gSpec = state.getGroupSpecs().get(topologyId); + if (gSpec == null) { + LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); + } else { + notifyStreamRouterBolt(gSpec, sds); + } + break; + default: + LOG.error("unexpected metadata type: {} ", type); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java index 4d69bca..d90fd9c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java @@ -62,7 +62,7 @@ public class CompositePolicyHandler implements PolicyStreamHandler { @Override public void send(StreamEvent event) throws Exception { -// policyHandler.send(event); + // policyHandler.send(event); send(event, 0); } @@ -71,8 +71,9 @@ public class CompositePolicyHandler implements PolicyStreamHandler { if (handlers.size() > idx) { handlers.get(idx).send(event); } else if (event instanceof AlertStreamEvent) { - if (LOG.isDebugEnabled()) + if (LOG.isDebugEnabled()) { LOG.debug("Emit new alert event: {}", event); + } collector.emit((AlertStreamEvent) event); // for alert stream events, emit if no handler found. } else { // nothing found. LOG, and throw exception http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java index 22f1408..40351ad 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java @@ -1,12 +1,4 @@ -package org.apache.eagle.alert.engine.evaluator; - -import java.util.List; -import java.util.Map; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -22,6 +14,14 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.eagle.alert.engine.evaluator; + +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; + +import java.util.List; +import java.util.Map; + public interface PolicyChangeListener { void onPolicyChange(List<PolicyDefinition> added, List<PolicyDefinition> removed, http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java index dd3ca24..e970ddd 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java @@ -1,11 +1,11 @@ package org.apache.eagle.alert.engine.evaluator; -import java.io.Serializable; - import org.apache.eagle.alert.engine.AlertStreamCollector; import org.apache.eagle.alert.engine.StreamContext; import org.apache.eagle.alert.engine.model.PartitionedEvent; +import java.io.Serializable; + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -31,11 +31,11 @@ import org.apache.eagle.alert.engine.model.PartitionedEvent; * Step 3: nextEvent * Step 4: close */ -public interface PolicyGroupEvaluator extends PolicyChangeListener, Serializable{ +public interface PolicyGroupEvaluator extends PolicyChangeListener, Serializable { void init(StreamContext context, AlertStreamCollector collector); /** - * Evaluate event + * Evaluate event. */ void nextEvent(PartitionedEvent event); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java index 335b237..49f7eed 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java @@ -1,10 +1,4 @@ -package org.apache.eagle.alert.engine.evaluator; - -import backtype.storm.metric.api.MultiCountMetric; -import com.typesafe.config.Config; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -20,6 +14,13 @@ import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; * See the License for the specific language governing permissions and * limitations under the License. */ + +package org.apache.eagle.alert.engine.evaluator; + +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import backtype.storm.metric.api.MultiCountMetric; +import com.typesafe.config.Config; + public class PolicyHandlerContext { private PolicyDefinition policyDefinition; private PolicyGroupEvaluator policyEvaluator; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java index 069d321..7b457b0 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java @@ -1,10 +1,4 @@ -package org.apache.eagle.alert.engine.evaluator; - -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -20,8 +14,16 @@ import org.apache.eagle.alert.engine.model.StreamEvent; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.eagle.alert.engine.evaluator; + +import org.apache.eagle.alert.engine.Collector; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.model.StreamEvent; + public interface PolicyStreamHandler { void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception; + void send(StreamEvent event) throws Exception; + void close() throws Exception; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java index 1e7aacc..116f633 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java @@ -16,8 +16,6 @@ */ package org.apache.eagle.alert.engine.evaluator; -import java.util.Map; - import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler; @@ -27,6 +25,8 @@ import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandl import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + /** * TODO/FIXME: to support multiple stage definition in single policy. The methods in this class is not good to understand now.(Hard code of 0/1). */ @@ -42,8 +42,8 @@ public class PolicyStreamHandlers { if (SIDDHI_ENGINE.equals(definition.getType())) { return new SiddhiPolicyHandler(sds, 0);// // FIXME: 8/2/16 } else if (NO_DATA_ALERT_ENGINE.equals(definition.getType())) { - // no data for an entire stream won't trigger gap alert (use local time & batch window instead) - return new NoDataPolicyTimeBatchHandler(sds); + // no data for an entire stream won't trigger gap alert (use local time & batch window instead) + return new NoDataPolicyTimeBatchHandler(sds); } else if (ABSENCE_ALERT_ENGINE.equals(definition.getType())) { return new AbsencePolicyHandler(sds); } else if (CUSTOMIZED_ENGINE.equals(definition.getType())) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java index 3b4aba8..f53139f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java @@ -22,8 +22,8 @@ import org.slf4j.LoggerFactory; import java.util.List; /** + * this assumes that event comes in time order. * Since 7/7/16. - * this assumes that event comes in time order */ public class AbsenceAlertDriver { private static final Logger LOG = LoggerFactory.getLogger(AbsenceAlertDriver.class); @@ -38,7 +38,7 @@ public class AbsenceAlertDriver { public boolean process(List<Object> appearAttrs, long occurTime) { // initialize window - if(processor == null){ + if (processor == null) { processor = nextProcessor(occurTime); LOG.info("initialized a new window {}", processor); } @@ -46,8 +46,8 @@ public class AbsenceAlertDriver { AbsenceWindowProcessor.OccurStatus status = processor.checkStatus(); boolean expired = processor.checkExpired(); boolean isAbsenceAlert = false; - if (expired){ - if(status == AbsenceWindowProcessor.OccurStatus.absent){ + if (expired) { + if (status == AbsenceWindowProcessor.OccurStatus.absent) { // send alert LOG.info("==================="); LOG.info("|| Absence Alert ||"); @@ -63,7 +63,8 @@ public class AbsenceAlertDriver { } /** - * calculate absolute time range based on current timestamp + * calculate absolute time range based on current timestamp. + * * @param currTime milliseconds * @return */ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java index ed50280..db4be7c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java @@ -20,7 +20,7 @@ package org.apache.eagle.alert.engine.evaluator.absence; * Since 7/7/16. */ public class AbsenceDailyRule implements AbsenceRule { - public static final long DAY_MILLI_SECONDS = 86400*1000L; + public static final long DAY_MILLI_SECONDS = 86400 * 1000L; public long startOffset; public long endOffset; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java index 826a69d..c372411 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java @@ -32,29 +32,29 @@ import java.util.*; /** * Since 7/6/16. - * * policy would be like: + * * policy would be like: * { - "name": "absenceAlertPolicy", - "description": "absenceAlertPolicy", - "inputStreams": [ - "absenceAlertStream" - ], - "outputStreams": [ - "absenceAlertStream_out" - ], - "definition": { - "type": "absencealert", - "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00" - }, - "partitionSpec": [ - { - "streamId": "absenceAlertStream", - "type": "GROUPBY", - "columns" : ["jobID"] - } - ], - "parallelismHint": 2 - } + * "name": "absenceAlertPolicy", + * "description": "absenceAlertPolicy", + * "inputStreams": [ + * "absenceAlertStream" + * ], + * "outputStreams": [ + * "absenceAlertStream_out" + * ], + * "definition": { + * "type": "absencealert", + * "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00" + * }, + * "partitionSpec": [ + * { + * "streamId": "absenceAlertStream", + * "type": "GROUPBY", + * "columns" : ["jobID"] + * } + * ], + * "parallelismHint": 2 + * } */ public class AbsencePolicyHandler implements PolicyStreamHandler { private static final Logger LOG = LoggerFactory.getLogger(AbsencePolicyHandler.class); @@ -66,7 +66,7 @@ public class AbsencePolicyHandler implements PolicyStreamHandler { private volatile List<Object> expectValues = new ArrayList<>(); private AbsenceAlertDriver driver; - public AbsencePolicyHandler(Map<String, StreamDefinition> sds){ + public AbsencePolicyHandler(Map<String, StreamDefinition> sds) { this.sds = sds; } @@ -77,11 +77,13 @@ public class AbsencePolicyHandler implements PolicyStreamHandler { this.policyDef = context.getPolicyDefinition(); List<String> inputStreams = policyDef.getInputStreams(); // validate inputStreams has to contain only one stream - if(inputStreams.size() != 1) + if (inputStreams.size() != 1) { throw new IllegalArgumentException("policy inputStream size has to be 1 for absence alert"); + } // validate outputStream has to contain only one stream - if(policyDef.getOutputStreams().size() != 1) + if (policyDef.getOutputStreams().size() != 1) { throw new IllegalArgumentException("policy outputStream size has to be 1 for absence alert"); + } String is = inputStreams.get(0); StreamDefinition sd = sds.get(is); @@ -94,17 +96,17 @@ public class AbsencePolicyHandler implements PolicyStreamHandler { int offset = 0; // populate wisb field names int numOfFields = Integer.parseInt(segments[offset++]); - for(int i = offset; i < offset+numOfFields; i++){ + for (int i = offset; i < offset + numOfFields; i++) { String fn = segments[i]; expectFieldIndices.add(sd.getColumnIndex(fn)); } offset += numOfFields; - for(int i = offset; i < offset+numOfFields; i++){ + for (int i = offset; i < offset + numOfFields; i++) { String fn = segments[i]; expectValues.add(fn); } offset += numOfFields; - String absence_window_rule_type = segments[offset++]; + String absenceWindowRuleType = segments[offset++]; AbsenceDailyRule rule = new AbsenceDailyRule(); SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); sdf.setTimeZone(TimeZone.getTimeZone("UTC")); @@ -120,7 +122,7 @@ public class AbsencePolicyHandler implements PolicyStreamHandler { public void send(StreamEvent event) throws Exception { Object[] data = event.getData(); List<Object> columnValues = new ArrayList<>(); - for(int i=0; i<expectFieldIndices.size(); i++){ + for (int i = 0; i < expectFieldIndices.size(); i++) { Object o = data[expectFieldIndices.get(i)]; // convert value to string columnValues.add(o.toString()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java index 728e702..9958dc7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java @@ -27,7 +27,7 @@ public class AbsenceWindow { public long startTime; public long endTime; - public String toString(){ + public String toString() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); sdf.setTimeZone(TimeZone.getTimeZone("UTC")); String t1 = sdf.format(new Date(startTime)); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java index 6cd0880..dfde09a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java @@ -21,20 +21,22 @@ package org.apache.eagle.alert.engine.evaluator.absence; */ public class AbsenceWindowGenerator { private AbsenceRule rule; - public AbsenceWindowGenerator(AbsenceRule rule){ + + public AbsenceWindowGenerator(AbsenceRule rule) { this.rule = rule; } /** - * @param currTime - * @return + * nextWindow. + * + * @param currTime current timestamp */ - public AbsenceWindow nextWindow(long currTime){ + public AbsenceWindow nextWindow(long currTime) { AbsenceWindow window = new AbsenceWindow(); - if(rule instanceof AbsenceDailyRule){ - AbsenceDailyRule r = (AbsenceDailyRule)rule; + if (rule instanceof AbsenceDailyRule) { + AbsenceDailyRule r = (AbsenceDailyRule) rule; long adjustment = 0; // if today's window already expires, then adjust to tomorrow's window - if(currTime % AbsenceDailyRule.DAY_MILLI_SECONDS > r.startOffset){ + if (currTime % AbsenceDailyRule.DAY_MILLI_SECONDS > r.startOffset) { adjustment = AbsenceDailyRule.DAY_MILLI_SECONDS; } // use current timestamp to round down to day @@ -43,7 +45,7 @@ public class AbsenceWindowGenerator { window.startTime = day + r.startOffset; window.endTime = day + r.endOffset; return window; - }else{ + } else { throw new UnsupportedOperationException("not supported rule " + rule); } }