[ https://issues.apache.org/jira/browse/EAGLE-971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhao, Qingwen updated EAGLE-971: -------------------------------- Description: This issue is caused by the wrong routing spec generated by the coordinator. Here is the procedure to reproduce it. 1. setting {{policiesPerBolt = 2, streamsPerBolt = 3, reuseBoltInStreams = true}} in server config 2. create four policies which has the same partition and consume the same stream {code} from HADOOP_JMX_METRIC_STREAM_SANDBOX[metric == "hadoop.namenode.rpc.callqueuelength"]#window.length(2) select site, host, component, metric, min(convert(value, "long")) as minValue group by site, host, component, metric having minValue >= 10000 insert into HADOOP_JMX_METRIC_STREAM_SANDBOX_CALL_QUEUE_EXCEEDS_OUT; from HADOOP_JMX_METRIC_STREAM_SANDBOX[metric == "hadoop.namenode.rpc.callqueuelength"]#window.length(30) select site, host, component, metric, min(convert(value, "long")) as minValue group by site, host, component, metric having minValue >= 10000 insert into HADOOP_JMX_METRIC_STREAM_SANDBOX_CALL_QUEUE_EXCEEDS_OUT; from HADOOP_JMX_METRIC_STREAM_SANDBOX[metric == "hadoop.namenode.hastate.failed.count"]#window.length(2) select site, host, component, metric, timestamp, min(value) as minValue group by site, host, component, metric insert into HADOOP_JMX_METRIC_STREAM_SANDBOX_NN_NO_RESPONSE_OUT from HADOOP_JMX_METRIC_STREAM_SANDBOX[metric == "hadoop.namenode.hastate.failed.count.test"]#window.length(3) select site, host, component, metric, count(value) as cnt group by site, host, component, metric insert into HADOOP_JMX_METRIC_STREAM_SANDBOX_NN_NO_RESPONSE_OUT; {code} After creating the four policies, the routing spec is {code} routerSpecs: [ { streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", partition: { streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", type: "GROUPBY", columns: [ "site", "host", "component", "metric" ], sortSpec: null }, targetQueue: [ { partition: { streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", type: "GROUPBY", columns: [ "site", "host", "component", "metric" ], sortSpec: null }, workers: [ { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt9" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt0" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt1" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt2" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt3" } ] }, { partition: { streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", type: "GROUPBY", columns: [ "site", "host", "component", "metric" ], sortSpec: null }, workers: [ { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt9" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt0" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt1" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt2" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt3" } ] }, { partition: { streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", type: "GROUPBY", columns: [ "site", "host", "component", "metric" ], sortSpec: null }, workers: [ { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt9" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt0" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt1" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt2" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt3" } ] } ] } ] {code} and the alert spec is {code} boltPolicyIdsMap: { alertBolt9: [ "NameNodeWithOneNoResponse", "NameNodeHAHasNoResponse", "CallQueueLengthExceeds30Times", "CallQueueLengthExceeds2Times" ], alertBolt0: [ "NameNodeWithOneNoResponse", "NameNodeHAHasNoResponse", "CallQueueLengthExceeds30Times", "CallQueueLengthExceeds2Times" ], alertBolt1: [ "NameNodeWithOneNoResponse", "NameNodeHAHasNoResponse", "CallQueueLengthExceeds30Times", "CallQueueLengthExceeds2Times" ], alertBolt2: [ "NameNodeWithOneNoResponse", "NameNodeHAHasNoResponse", "CallQueueLengthExceeds30Times", "CallQueueLengthExceeds2Times" ], alertBolt3: [ "NameNodeWithOneNoResponse", "NameNodeHAHasNoResponse", "CallQueueLengthExceeds30Times", "CallQueueLengthExceeds2Times" ] } {code} 3. produce messages into kafka topic 'hadoop_jmx_metrics_sandbox' and trigger NameNodeWithOneNoResponse. {code} {"timestamp": 1490250963445, "metric": "hadoop.namenode.hastate.failed.count", "component": "namenode", "site": "artemislvs", "value": 0.0, "host": "localhost"} {code} Then one message is sent three times. was: This issue is caused by the wrong routing spec generated by the coordinator. Here is the procedure to reproduce it. 1. setting {{policiesPerBolt = 2, streamsPerBolt = 3, reuseBoltInStreams = true}} in server config 2. create four policies which the same partition and consuming the same streamId {code} from HADOOP_JMX_METRIC_STREAM_SANDBOX[metric == "hadoop.namenode.rpc.callqueuelength"]#window.length(2) select site, host, component, metric, min(convert(value, "long")) as minValue group by site, host, component, metric having minValue >= 10000 insert into HADOOP_JMX_METRIC_STREAM_SANDBOX_CALL_QUEUE_EXCEEDS_OUT; from HADOOP_JMX_METRIC_STREAM_SANDBOX[metric == "hadoop.namenode.rpc.callqueuelength"]#window.length(30) select site, host, component, metric, min(convert(value, "long")) as minValue group by site, host, component, metric having minValue >= 10000 insert into HADOOP_JMX_METRIC_STREAM_SANDBOX_CALL_QUEUE_EXCEEDS_OUT; from HADOOP_JMX_METRIC_STREAM_SANDBOX[metric == "hadoop.namenode.hastate.failed.count"]#window.length(2) select site, host, component, metric, timestamp, min(value) as minValue group by site, host, component, metric insert into HADOOP_JMX_METRIC_STREAM_SANDBOX_NN_NO_RESPONSE_OUT from HADOOP_JMX_METRIC_STREAM_SANDBOX[metric == "hadoop.namenode.hastate.failed.count.test"]#window.length(3) select site, host, component, metric, count(value) as cnt group by site, host, component, metric insert into HADOOP_JMX_METRIC_STREAM_SANDBOX_NN_NO_RESPONSE_OUT; {code} After creating the four policies, the routing spec is {code} routerSpecs: [ { streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", partition: { streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", type: "GROUPBY", columns: [ "site", "host", "component", "metric" ], sortSpec: null }, targetQueue: [ { partition: { streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", type: "GROUPBY", columns: [ "site", "host", "component", "metric" ], sortSpec: null }, workers: [ { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt9" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt0" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt1" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt2" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt3" } ] }, { partition: { streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", type: "GROUPBY", columns: [ "site", "host", "component", "metric" ], sortSpec: null }, workers: [ { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt9" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt0" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt1" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt2" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt3" } ] }, { partition: { streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", type: "GROUPBY", columns: [ "site", "host", "component", "metric" ], sortSpec: null }, workers: [ { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt9" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt0" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt1" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt2" }, { topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", boltId: "alertBolt3" } ] } ] } ] {code} and the alert spec is {code} boltPolicyIdsMap: { alertBolt9: [ "NameNodeWithOneNoResponse", "NameNodeHAHasNoResponse", "CallQueueLengthExceeds30Times", "CallQueueLengthExceeds2Times" ], alertBolt0: [ "NameNodeWithOneNoResponse", "NameNodeHAHasNoResponse", "CallQueueLengthExceeds30Times", "CallQueueLengthExceeds2Times" ], alertBolt1: [ "NameNodeWithOneNoResponse", "NameNodeHAHasNoResponse", "CallQueueLengthExceeds30Times", "CallQueueLengthExceeds2Times" ], alertBolt2: [ "NameNodeWithOneNoResponse", "NameNodeHAHasNoResponse", "CallQueueLengthExceeds30Times", "CallQueueLengthExceeds2Times" ], alertBolt3: [ "NameNodeWithOneNoResponse", "NameNodeHAHasNoResponse", "CallQueueLengthExceeds30Times", "CallQueueLengthExceeds2Times" ] } {code} 3. produce messages into kafka topic 'hadoop_jmx_metrics_sandbox' and trigger NameNodeWithOneNoResponse. {code} {"timestamp": 1490250963445, "metric": "hadoop.namenode.hastate.failed.count", "component": "namenode", "site": "artemislvs", "value": 0.0, "host": "localhost"} {code} Then one message is sent three times. > Duplicated queues are generated under a monitored stream > -------------------------------------------------------- > > Key: EAGLE-971 > URL: https://issues.apache.org/jira/browse/EAGLE-971 > Project: Eagle > Issue Type: Bug > Affects Versions: v0.5.0 > Reporter: Zhao, Qingwen > Assignee: Zhao, Qingwen > > This issue is caused by the wrong routing spec generated by the coordinator. > Here is the procedure to reproduce it. > 1. setting {{policiesPerBolt = 2, streamsPerBolt = 3, reuseBoltInStreams = > true}} in server config > 2. create four policies which has the same partition and consume the same > stream > {code} > from HADOOP_JMX_METRIC_STREAM_SANDBOX[metric == > "hadoop.namenode.rpc.callqueuelength"]#window.length(2) select site, host, > component, metric, min(convert(value, "long")) as minValue group by site, > host, component, metric having minValue >= 10000 insert into > HADOOP_JMX_METRIC_STREAM_SANDBOX_CALL_QUEUE_EXCEEDS_OUT; > from HADOOP_JMX_METRIC_STREAM_SANDBOX[metric == > "hadoop.namenode.rpc.callqueuelength"]#window.length(30) select site, host, > component, metric, min(convert(value, "long")) as minValue group by site, > host, component, metric having minValue >= 10000 insert into > HADOOP_JMX_METRIC_STREAM_SANDBOX_CALL_QUEUE_EXCEEDS_OUT; > from HADOOP_JMX_METRIC_STREAM_SANDBOX[metric == > "hadoop.namenode.hastate.failed.count"]#window.length(2) select site, host, > component, metric, timestamp, min(value) as minValue group by site, host, > component, metric insert into > HADOOP_JMX_METRIC_STREAM_SANDBOX_NN_NO_RESPONSE_OUT > from HADOOP_JMX_METRIC_STREAM_SANDBOX[metric == > "hadoop.namenode.hastate.failed.count.test"]#window.length(3) select site, > host, component, metric, count(value) as cnt group by site, host, component, > metric insert into HADOOP_JMX_METRIC_STREAM_SANDBOX_NN_NO_RESPONSE_OUT; > {code} > After creating the four policies, the routing spec is > {code} > routerSpecs: [ > { > streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", > partition: { > streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", > type: "GROUPBY", > columns: [ > "site", > "host", > "component", > "metric" > ], > sortSpec: null > }, > targetQueue: [ > { > partition: { > streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", > type: "GROUPBY", > columns: [ > "site", > "host", > "component", > "metric" > ], > sortSpec: null > }, > workers: [ > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt9" > }, > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt0" > }, > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt1" > }, > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt2" > }, > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt3" > } > ] > }, > { > partition: { > streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", > type: "GROUPBY", > columns: [ > "site", > "host", > "component", > "metric" > ], > sortSpec: null > }, > workers: [ > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt9" > }, > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt0" > }, > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt1" > }, > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt2" > }, > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt3" > } > ] > }, > { > partition: { > streamId: "HADOOP_JMX_METRIC_STREAM_SANDBOX", > type: "GROUPBY", > columns: [ > "site", > "host", > "component", > "metric" > ], > sortSpec: null > }, > workers: [ > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt9" > }, > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt0" > }, > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt1" > }, > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt2" > }, > { > topologyName: "ALERT_UNIT_TOPOLOGY_APP_SANDBOX", > boltId: "alertBolt3" > } > ] > } > ] > } > ] > {code} > and the alert spec is > {code} > boltPolicyIdsMap: { > alertBolt9: [ > "NameNodeWithOneNoResponse", > "NameNodeHAHasNoResponse", > "CallQueueLengthExceeds30Times", > "CallQueueLengthExceeds2Times" > ], > alertBolt0: [ > "NameNodeWithOneNoResponse", > "NameNodeHAHasNoResponse", > "CallQueueLengthExceeds30Times", > "CallQueueLengthExceeds2Times" > ], > alertBolt1: [ > "NameNodeWithOneNoResponse", > "NameNodeHAHasNoResponse", > "CallQueueLengthExceeds30Times", > "CallQueueLengthExceeds2Times" > ], > alertBolt2: [ > "NameNodeWithOneNoResponse", > "NameNodeHAHasNoResponse", > "CallQueueLengthExceeds30Times", > "CallQueueLengthExceeds2Times" > ], > alertBolt3: [ > "NameNodeWithOneNoResponse", > "NameNodeHAHasNoResponse", > "CallQueueLengthExceeds30Times", > "CallQueueLengthExceeds2Times" > ] > } > {code} > 3. produce messages into kafka topic 'hadoop_jmx_metrics_sandbox' and trigger > NameNodeWithOneNoResponse. > {code} > {"timestamp": 1490250963445, "metric": > "hadoop.namenode.hastate.failed.count", "component": "namenode", "site": > "artemislvs", "value": 0.0, "host": "localhost"} > {code} > Then one message is sent three times. -- This message was sent by Atlassian JIRA (v6.3.15#6346)