http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java index f50ad15..fe70630 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java @@ -28,20 +28,20 @@ import org.wso2.siddhi.core.util.EventPrinter; */ public class TestNoDataAlert { @Test - public void test() throws Exception{ - String[] expectHosts = new String[]{"host_1","host_2","host_3","host_4","host_5","host_6","host_7","host_8"}; + public void test() throws Exception { + String[] expectHosts = new String[] {"host_1", "host_2", "host_3", "host_4", "host_5", "host_6", "host_7", "host_8"}; // String[] appearHosts = new String[]{"host_6","host_7","host_8"}; // String[] noDataHosts = new String[]{"host_1","host_2","host_3","host_4","host_5"}; ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime( - "define stream appearStream (key string, src string);"+ - "define stream expectStream (key string, src string);"+ - "define table expectTable (key string, src string);"+ - "define trigger fiveSecTriggerStream at every 1 sec;"+ - "define trigger initAppearTriggerStream at 'start';"+ - "from expectStream insert into expectTable;"+ - "from fiveSecTriggerStream join expectTable insert into triggerExpectStream;"+ - "from initAppearTriggerStream join expectTable insert into initAppearStream;" + "define stream appearStream (key string, src string);" + + "define stream expectStream (key string, src string);" + + "define table expectTable (key string, src string);" + + "define trigger fiveSecTriggerStream at every 1 sec;" + + "define trigger initAppearTriggerStream at 'start';" + + "from expectStream insert into expectTable;" + + "from fiveSecTriggerStream join expectTable insert into triggerExpectStream;" + + "from initAppearTriggerStream join expectTable insert into initAppearStream;" // "from triggerExpectStream as l left outer join appearStream#window.time(5 sec) as r on l.key == r.key select l.key as k1,r.key as k2 insert current events into joinStream;" + // "from joinStream[k2 is null] select k1 insert current events into missingStream;" ); @@ -65,8 +65,8 @@ public class TestNoDataAlert { }); runtime.start(); - for(String host: expectHosts) { - runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[]{host,"expectStream"}); + for (String host : expectHosts) { + runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[] {host, "expectStream"}); } // for(String host:appearHosts) { @@ -83,17 +83,17 @@ public class TestNoDataAlert { /** * only alert when the successive 2 events has number of missing blocks changed - *from every a = hadoopJmxMetricEventStream[ component=="namenode" and metric == "hadoop.namenode.dfs.missingblocks"] -> b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, "long") > convert(a.value, "long") ] select b.metric as metric, b.host as host, b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site insert into tmp; + * from every a = hadoopJmxMetricEventStream[ component=="namenode" and metric == "hadoop.namenode.dfs.missingblocks"] -> b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, "long") > convert(a.value, "long") ] select b.metric as metric, b.host as host, b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site insert into tmp; */ @Test - public void testMissingBlock() throws Exception{ + public void testMissingBlock() throws Exception { ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime( - "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);"+ - "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> "+ - "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and "+ - "convert(b.value, \"long\") > convert(a.value, \"long\") ] select b.metric as metric, b.host as host, "+ - "b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, " + - "b.site as site insert into outputStream;" + "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);" + + "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> " + + "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and " + + "convert(b.value, \"long\") > convert(a.value, \"long\") ] select b.metric as metric, b.host as host, " + + "b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, " + + "b.site as site insert into outputStream;" ); runtime.addCallback("outputStream", new StreamCallback() { @@ -104,9 +104,9 @@ public class TestNoDataAlert { }); runtime.start(); - runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 12.0, 123000L}); - runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 13.0, 123100L}); - runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 16.0, 123200L}); + runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 12.0, 123000L}); + runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 13.0, 123100L}); + runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 16.0, 123200L}); Thread.sleep(5000);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java index 6305da8..5564b90 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java @@ -16,10 +16,6 @@ */ package org.apache.eagle.alert.engine.nodata; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - import org.apache.eagle.alert.engine.Collector; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.StreamColumn; @@ -33,6 +29,10 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + /** * Since 6/29/16. */ @@ -42,13 +42,13 @@ public class TestNoDataPolicyHandler { private static final String outputStream = "testOutputStream"; @Test - public void test() throws Exception{ + public void test() throws Exception { test(buildPolicyDef_provided()); test(buildPolicyDef_dynamic()); } @SuppressWarnings("unchecked") - public void test(PolicyDefinition pd) throws Exception{ + public void test(PolicyDefinition pd) throws Exception { Map<String, StreamDefinition> sds = new HashMap<>(); StreamDefinition sd = buildStreamDef(); sds.put("testInputStream", sd); @@ -71,17 +71,17 @@ public class TestNoDataPolicyHandler { } @SuppressWarnings("rawtypes") - private static class TestCollector implements Collector{ + private static class TestCollector implements Collector { @Override public void emit(Object o) { - AlertStreamEvent e = (AlertStreamEvent)o; + AlertStreamEvent e = (AlertStreamEvent) o; Object[] data = e.getData(); Assert.assertEquals("host2", data[1]); LOG.info(e.toString()); } } - private PolicyDefinition buildPolicyDef_provided(){ + private PolicyDefinition buildPolicyDef_provided() { PolicyDefinition pd = new PolicyDefinition(); PolicyDefinition.Definition def = new PolicyDefinition.Definition(); def.setValue("PT1M,provided,1,host,host1,host2"); @@ -93,7 +93,7 @@ public class TestNoDataPolicyHandler { return pd; } - private PolicyDefinition buildPolicyDef_dynamic(){ + private PolicyDefinition buildPolicyDef_dynamic() { PolicyDefinition pd = new PolicyDefinition(); PolicyDefinition.Definition def = new PolicyDefinition.Definition(); def.setValue("PT1M,dynamic,1,host"); @@ -104,7 +104,8 @@ public class TestNoDataPolicyHandler { pd.setName("nodataalert-test"); return pd; } - private StreamDefinition buildStreamDef(){ + + private StreamDefinition buildStreamDef() { StreamDefinition sd = new StreamDefinition(); StreamColumn tsColumn = new StreamColumn(); tsColumn.setName("timestamp"); @@ -124,9 +125,9 @@ public class TestNoDataPolicyHandler { return sd; } - private StreamEvent buildStreamEvt(long ts, String host, double value){ + private StreamEvent buildStreamEvt(long ts, String host, double value) { StreamEvent e = new StreamEvent(); - e.setData(new Object[]{ts, host, value}); + e.setData(new Object[] {ts, host, value}); e.setStreamId(inputStream); e.setTimestamp(ts); return e; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java index 02d19b4..84844e7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java @@ -16,10 +16,6 @@ */ package org.apache.eagle.alert.engine.nodata; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - import org.apache.eagle.alert.engine.Collector; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.StreamColumn; @@ -34,125 +30,129 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + public class TestNoDataPolicyTimeBatchHandler { - private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyTimeBatchHandler.class); - - private static final String inputStream = "testInputStream"; - private static final String outputStream = "testOutputStream"; - - @Before - public void setup() { - } - - @SuppressWarnings("unchecked") - @Test - public void testDynamic1() throws Exception { - Map<String, StreamDefinition> sds = new HashMap<>(); - sds.put("testInputStream", buildStreamDef()); - sds.put("testOutputStream", buildOutputStreamDef()); - NoDataPolicyTimeBatchHandler handler = new NoDataPolicyTimeBatchHandler(sds); - - PolicyHandlerContext context = new PolicyHandlerContext(); - context.setPolicyDefinition(buildPolicyDef_dynamic()); - handler.prepare(new TestCollector(), context); - - long now = System.currentTimeMillis(); - - handler.send(buildStreamEvt(now, "host1", 12.5)); - - Thread.sleep(2000); - - handler.send(buildStreamEvt(now, "host2", 12.6)); - handler.send(buildStreamEvt(now, "host1", 20.9)); - handler.send(buildStreamEvt(now, "host2", 22.1)); - handler.send(buildStreamEvt(now, "host2", 22.1)); - - Thread.sleep(5000); - - handler.send(buildStreamEvt(now, "host2", 22.1)); - handler.send(buildStreamEvt(now, "host2", 22.3)); - - Thread.sleep(5000); - - handler.send(buildStreamEvt(now, "host2", 22.9)); - handler.send(buildStreamEvt(now, "host1", 41.6)); - handler.send(buildStreamEvt(now, "host2", 45.6)); - - Thread.sleep(1000); - } - - @SuppressWarnings("rawtypes") - private static class TestCollector implements Collector{ + private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyTimeBatchHandler.class); + + private static final String inputStream = "testInputStream"; + private static final String outputStream = "testOutputStream"; + + @Before + public void setup() { + } + + @SuppressWarnings("unchecked") + @Test + public void testDynamic1() throws Exception { + Map<String, StreamDefinition> sds = new HashMap<>(); + sds.put("testInputStream", buildStreamDef()); + sds.put("testOutputStream", buildOutputStreamDef()); + NoDataPolicyTimeBatchHandler handler = new NoDataPolicyTimeBatchHandler(sds); + + PolicyHandlerContext context = new PolicyHandlerContext(); + context.setPolicyDefinition(buildPolicyDef_dynamic()); + handler.prepare(new TestCollector(), context); + + long now = System.currentTimeMillis(); + + handler.send(buildStreamEvt(now, "host1", 12.5)); + + Thread.sleep(2000); + + handler.send(buildStreamEvt(now, "host2", 12.6)); + handler.send(buildStreamEvt(now, "host1", 20.9)); + handler.send(buildStreamEvt(now, "host2", 22.1)); + handler.send(buildStreamEvt(now, "host2", 22.1)); + + Thread.sleep(5000); + + handler.send(buildStreamEvt(now, "host2", 22.1)); + handler.send(buildStreamEvt(now, "host2", 22.3)); + + Thread.sleep(5000); + + handler.send(buildStreamEvt(now, "host2", 22.9)); + handler.send(buildStreamEvt(now, "host1", 41.6)); + handler.send(buildStreamEvt(now, "host2", 45.6)); + + Thread.sleep(1000); + } + + @SuppressWarnings("rawtypes") + private static class TestCollector implements Collector { @Override public void emit(Object o) { - AlertStreamEvent e = (AlertStreamEvent)o; + AlertStreamEvent e = (AlertStreamEvent) o; Object[] data = e.getData(); - + LOG.info("alert data: {}, {}", data[1], data[0]); - + Assert.assertEquals("host1", data[1]); } } - private PolicyDefinition buildPolicyDef_dynamic() { - PolicyDefinition pd = new PolicyDefinition(); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - def.setValue("PT5S,dynamic,1,host"); - def.setType("nodataalert"); - pd.setDefinition(def); - pd.setInputStreams(Arrays.asList(inputStream)); - pd.setOutputStreams(Arrays.asList(outputStream)); - pd.setName("nodataalert-test"); - return pd; - } - - private StreamDefinition buildStreamDef() { - StreamDefinition sd = new StreamDefinition(); - StreamColumn tsColumn = new StreamColumn(); - tsColumn.setName("timestamp"); - tsColumn.setType(StreamColumn.Type.LONG); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("host"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn valueColumn = new StreamColumn(); - valueColumn.setName("value"); - valueColumn.setType(StreamColumn.Type.DOUBLE); - - sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn)); - sd.setDataSource("testDataSource"); - sd.setStreamId("testInputStream"); - return sd; - } - - private StreamDefinition buildOutputStreamDef() { - StreamDefinition sd = new StreamDefinition(); - StreamColumn tsColumn = new StreamColumn(); - tsColumn.setName("timestamp"); - tsColumn.setType(StreamColumn.Type.LONG); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("host"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn valueColumn = new StreamColumn(); - valueColumn.setName("originalStreamName"); - valueColumn.setType(StreamColumn.Type.STRING); - - sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn)); - sd.setDataSource("testDataSource"); - sd.setStreamId("testOutputStream"); - return sd; - } - - private StreamEvent buildStreamEvt(long ts, String host, double value) { - StreamEvent e = new StreamEvent(); - e.setData(new Object[] { ts, host, value }); - e.setStreamId(inputStream); - e.setTimestamp(ts); - return e; - } + private PolicyDefinition buildPolicyDef_dynamic() { + PolicyDefinition pd = new PolicyDefinition(); + PolicyDefinition.Definition def = new PolicyDefinition.Definition(); + def.setValue("PT5S,dynamic,1,host"); + def.setType("nodataalert"); + pd.setDefinition(def); + pd.setInputStreams(Arrays.asList(inputStream)); + pd.setOutputStreams(Arrays.asList(outputStream)); + pd.setName("nodataalert-test"); + return pd; + } + + private StreamDefinition buildStreamDef() { + StreamDefinition sd = new StreamDefinition(); + StreamColumn tsColumn = new StreamColumn(); + tsColumn.setName("timestamp"); + tsColumn.setType(StreamColumn.Type.LONG); + + StreamColumn hostColumn = new StreamColumn(); + hostColumn.setName("host"); + hostColumn.setType(StreamColumn.Type.STRING); + + StreamColumn valueColumn = new StreamColumn(); + valueColumn.setName("value"); + valueColumn.setType(StreamColumn.Type.DOUBLE); + + sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn)); + sd.setDataSource("testDataSource"); + sd.setStreamId("testInputStream"); + return sd; + } + + private StreamDefinition buildOutputStreamDef() { + StreamDefinition sd = new StreamDefinition(); + StreamColumn tsColumn = new StreamColumn(); + tsColumn.setName("timestamp"); + tsColumn.setType(StreamColumn.Type.LONG); + + StreamColumn hostColumn = new StreamColumn(); + hostColumn.setName("host"); + hostColumn.setType(StreamColumn.Type.STRING); + + StreamColumn valueColumn = new StreamColumn(); + valueColumn.setName("originalStreamName"); + valueColumn.setType(StreamColumn.Type.STRING); + + sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn)); + sd.setDataSource("testDataSource"); + sd.setStreamId("testOutputStream"); + return sd; + } + + private StreamEvent buildStreamEvt(long ts, String host, double value) { + StreamEvent e = new StreamEvent(); + e.setData(new Object[] {ts, host, value}); + e.setStreamId(inputStream); + e.setTimestamp(ts); + return e; + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java index f2027b2..77ab9c3 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java @@ -16,12 +16,9 @@ */ package org.apache.eagle.alert.engine.perf; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.apache.commons.io.FilenameUtils; import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; @@ -31,23 +28,26 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; /** * Since 5/13/16. */ public class TestSerDeserPer { Object[] data = null; + @Before - public void before(){ + public void before() { int max = 100; StringBuilder sb = new StringBuilder(); - for(int i=0; i<max; i++){ + for (int i = 0; i < max; i++) { sb.append("a"); } - data = new Object[]{sb.toString()}; + data = new Object[] {sb.toString()}; } private String getTmpPath() { @@ -55,11 +55,11 @@ public class TestSerDeserPer { } @Test - public void testSerDeserPerf() throws Exception{ + public void testSerDeserPerf() throws Exception { Kryo kryo = new Kryo(); String outputPath = FilenameUtils.concat(getTmpPath(), "file.bin"); Output output = new Output(new FileOutputStream(outputPath)); - for(int i=0; i<1000; i++){ + for (int i = 0; i < 1000; i++) { kryo.writeObject(output, constructPE()); } output.close(); @@ -69,7 +69,7 @@ public class TestSerDeserPer { Assert.assertTrue(someObject.getData().length == 1); } - private PartitionedEvent constructPE(){ + private PartitionedEvent constructPE() { StreamEvent e = new StreamEvent(); e.setStreamId("testStreamId"); e.setTimestamp(1463159382000L); @@ -92,11 +92,11 @@ public class TestSerDeserPer { } @Test - public void testSerDeserPerf2() throws Exception{ + public void testSerDeserPerf2() throws Exception { Kryo kryo = new Kryo(); String outputPath = FilenameUtils.concat(getTmpPath(), "file2.bin"); Output output = new Output(new FileOutputStream(outputPath)); - for(int i=0; i<1000; i++){ + for (int i = 0; i < 1000; i++) { kryo.writeObject(output, constructNewPE()); } output.close(); @@ -106,7 +106,7 @@ public class TestSerDeserPer { Assert.assertTrue(someObject.getData().length == 1); } - private NewPartitionedEvent constructNewPE(){ + private NewPartitionedEvent constructNewPE() { NewPartitionedEvent pe = new NewPartitionedEvent(); pe.setStreamId("testStreamId"); pe.setTimestamp(1463159382000L); @@ -124,11 +124,11 @@ public class TestSerDeserPer { } @Test - public void testSerDeserPerf3() throws Exception{ + public void testSerDeserPerf3() throws Exception { Kryo kryo = new Kryo(); String outputPath = FilenameUtils.concat(getTmpPath(), "file3.bin"); Output output = new Output(new FileOutputStream(outputPath)); - for(int i=0; i<1000; i++){ + for (int i = 0; i < 1000; i++) { kryo.writeObject(output, constructNewPE2()); } output.close(); @@ -138,7 +138,7 @@ public class TestSerDeserPer { Assert.assertTrue(someObject.getData().length == 1); } - private NewPartitionedEvent2 constructNewPE2(){ + private NewPartitionedEvent2 constructNewPE2() { NewPartitionedEvent2 pe = new NewPartitionedEvent2(); pe.setStreamId(100); pe.setTimestamp(1463159382000L); @@ -168,10 +168,10 @@ public class TestSerDeserPer { private long partitionKey; // sort spec - private String windowPeriod=""; + private String windowPeriod = ""; private long windowMargin = 30 * 1000; - public NewPartitionedEvent(){ + public NewPartitionedEvent() { } public String getStreamId() { @@ -255,7 +255,7 @@ public class TestSerDeserPer { private long windowPeriod; private long windowMargin = 30 * 1000; - public NewPartitionedEvent2(){ + public NewPartitionedEvent2() { } public int getStreamId() { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java index f3548d8..4bec98d 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java @@ -58,7 +58,7 @@ import static org.mockito.Mockito.when; /** * Since 5/2/16. */ -@SuppressWarnings({"rawtypes", "unused"}) +@SuppressWarnings( {"rawtypes", "unused"}) public class TestAlertBolt { public static final String TEST_STREAM = "test-stream"; @@ -66,22 +66,20 @@ public class TestAlertBolt { /** * Following knowledge is guaranteed in * + * @throws Exception Add test case: 2 alerts should be generated even if they are very close to each other in timestamp * @see org.apache.eagle.alert.engine.runner.AlertBolt#execute{ - * if(!routedStreamEvent.getRoute().getTargetComponentId().equals(this.policyGroupEvaluator.getName())){ - * throw new IllegalStateException("Got event targeted to "+ routedStreamEvent.getRoute().getTargetComponentId()+" in "+this.policyGroupEvaluator.getName()); - * } + * if(!routedStreamEvent.getRoute().getTargetComponentId().equals(this.policyGroupEvaluator.getName())){ + * throw new IllegalStateException("Got event targeted to "+ routedStreamEvent.getRoute().getTargetComponentId()+" in "+this.policyGroupEvaluator.getName()); + * } * } - * - * @throws Exception - * - * Add test case: 2 alerts should be generated even if they are very close to each other in timestamp */ @Test - public void testAlertBolt() throws Exception{ + public void testAlertBolt() throws Exception { final AtomicInteger alertCount = new AtomicInteger(); final Semaphore mutex = new Semaphore(0); - OutputCollector collector = new OutputCollector(new IOutputCollector(){ + OutputCollector collector = new OutputCollector(new IOutputCollector() { int count = 0; + @Override public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { alertCount.incrementAndGet(); @@ -91,14 +89,22 @@ public class TestAlertBolt { System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple)); return null; } + @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { } + public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { + } + @Override - public void ack(Tuple input) { } + public void ack(Tuple input) { + } + @Override - public void fail(Tuple input) { } + public void fail(Tuple input) { + } + @Override - public void reportError(Throwable error) { } + public void reportError(Throwable error) { + } }); AlertBolt bolt = createAlertBolt(collector); @@ -143,27 +149,27 @@ public class TestAlertBolt { // construct event with "value1" StreamEvent event1 = new StreamEvent(); - event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000); + event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000); event1.setMetaVersion("version1"); - Object[] data = new Object[]{"value1"}; + Object[] data = new Object[] {"value1"}; event1.setData(data); event1.setStreamId(streamId); - PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp,1001); + PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp, 1001); // construct another event with "value1" StreamEvent event2 = new StreamEvent(); - event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000); + event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000); event2.setMetaVersion("version1"); - data = new Object[]{"value2"}; + data = new Object[] {"value2"}; event2.setData(data); event2.setStreamId(streamId); - PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp,1001); + PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp, 1001); Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default"); Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default"); bolt.execute(input); bolt.execute(input2); - Assert.assertTrue("Timeout to acquire mutex in 5s",mutex.tryAcquire(2, 5, TimeUnit.SECONDS)); + Assert.assertTrue("Timeout to acquire mutex in 5s", mutex.tryAcquire(2, 5, TimeUnit.SECONDS)); Assert.assertEquals(2, alertCount.get()); bolt.cleanup(); } @@ -183,8 +189,9 @@ public class TestAlertBolt { @Test public void testMetadataMismatch() throws Exception { AtomicInteger failedCount = new AtomicInteger(); - OutputCollector collector = new OutputCollector(new IOutputCollector(){ + OutputCollector collector = new OutputCollector(new IOutputCollector() { int count = 0; + @Override public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { Assert.assertEquals("testAlertStream", tuple.get(0)); @@ -192,14 +199,23 @@ public class TestAlertBolt { System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple)); return null; } + @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { } + public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { + } + @Override - public void ack(Tuple input) { } + public void ack(Tuple input) { + } + @Override - public void fail(Tuple input) { failedCount.incrementAndGet(); } + public void fail(Tuple input) { + failedCount.incrementAndGet(); + } + @Override - public void reportError(Throwable error) { } + public void reportError(Throwable error) { + } }); AlertBolt bolt = createAlertBolt(collector); @@ -267,8 +283,9 @@ public class TestAlertBolt { @Test public void testMetaversionConflict() throws Exception { AtomicInteger failedCount = new AtomicInteger(); - OutputCollector collector = new OutputCollector(new IOutputCollector(){ + OutputCollector collector = new OutputCollector(new IOutputCollector() { int count = 0; + @Override public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { Assert.assertEquals("testAlertStream", tuple.get(0)); @@ -276,14 +293,23 @@ public class TestAlertBolt { System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple)); return null; } + @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { } + public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { + } + @Override - public void ack(Tuple input) { } + public void ack(Tuple input) { + } + @Override - public void fail(Tuple input) { failedCount.incrementAndGet(); } + public void fail(Tuple input) { + failedCount.incrementAndGet(); + } + @Override - public void reportError(Throwable error) { } + public void reportError(Throwable error) { + } }); AlertBolt bolt = createAlertBolt(collector); @@ -374,21 +400,25 @@ public class TestAlertBolt { } @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {} + public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { + } @Override - public void ack(Tuple input) {} + public void ack(Tuple input) { + } @Override - public void fail(Tuple input) {} + public void fail(Tuple input) { + } @Override - public void reportError(Throwable error) {} + public void reportError(Throwable error) { + } }); AlertBolt bolt = createAlertBolt(collector); boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def)); - boltSpecs.setVersion("spec_"+System.currentTimeMillis()); + boltSpecs.setVersion("spec_" + System.currentTimeMillis()); // stream def map Map<String, StreamDefinition> sds = new HashMap(); StreamDefinition sdTest = new StreamDefinition(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java index 1e52036..61a0aba 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java @@ -66,13 +66,13 @@ public class TestAlertPublisherBolt { publisher.nextEvent(event1); } - private AlertStreamEvent create(String streamId){ + private AlertStreamEvent create(String streamId) { AlertStreamEvent alert = new AlertStreamEvent(); PolicyDefinition policy = new PolicyDefinition(); policy.setName("policy1"); alert.setPolicyId(policy.getName()); alert.setCreatedTime(System.currentTimeMillis()); - alert.setData(new Object[]{"field_1", 2, "field_3"}); + alert.setData(new Object[] {"field_1", 2, "field_3"}); alert.setStreamId(streamId); alert.setCreatedBy(this.toString()); return alert; @@ -165,13 +165,13 @@ public class TestAlertPublisherBolt { return l; } - private AlertStreamEvent createWithStreamDef(String hostname, String appName){ + private AlertStreamEvent createWithStreamDef(String hostname, String appName) { AlertStreamEvent alert = new AlertStreamEvent(); PolicyDefinition policy = new PolicyDefinition(); policy.setName("perfmon_cpu_host_check"); alert.setPolicyId(policy.getName()); alert.setCreatedTime(System.currentTimeMillis()); - alert.setData(new Object[]{appName, hostname}); + alert.setData(new Object[] {appName, hostname}); alert.setStreamId("testAlertStream"); alert.setCreatedBy(this.toString()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java index 13550e1..830f1f7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java @@ -16,19 +16,16 @@ */ package org.apache.eagle.alert.engine.runner; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import backtype.storm.metric.api.MultiCountMetric; +import backtype.storm.task.GeneralTopologyContext; +import backtype.storm.task.IOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.TupleImpl; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue; import org.apache.eagle.alert.coordination.model.RouterSpec; import org.apache.eagle.alert.coordination.model.StreamRouterSpec; @@ -40,7 +37,6 @@ import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService; import org.apache.eagle.alert.engine.model.PartitionedEvent; import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl; import org.apache.eagle.alert.utils.DateTimeUtil; import org.apache.eagle.alert.utils.StreamIdConversion; import org.joda.time.Period; @@ -49,43 +45,39 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.metric.api.MultiCountMetric; -import backtype.storm.task.GeneralTopologyContext; -import backtype.storm.task.IOutputCollector; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.TupleImpl; +import java.io.IOException; +import java.util.*; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestStreamRouterBolt { private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class); /** * Mocked 5 Events - * + * <p> * 1. Sent in random order: * "value1","value2","value3","value4","value5" - * + * <p> * 2. Received correct time order and value5 is thrown because too late: "value2","value1","value3","value4" * * @throws Exception */ @SuppressWarnings("rawtypes") @Test - public void testRouterWithSortAndRouteSpec() throws Exception{ + public void testRouterWithSortAndRouteSpec() throws Exception { Config config = ConfigFactory.load(); MockChangeService mockChangeService = new MockChangeService(); StreamRouterBolt routerBolt = new StreamRouterBolt("routerBolt1", config, mockChangeService); - final Map<String,List<PartitionedEvent>> streamCollected = new HashMap<>(); + final Map<String, List<PartitionedEvent>> streamCollected = new HashMap<>(); final List<PartitionedEvent> orderCollected = new ArrayList<>(); - OutputCollector collector = new OutputCollector(new IOutputCollector(){ + OutputCollector collector = new OutputCollector(new IOutputCollector() { int count = 0; + @Override public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { PartitionedEvent event; @@ -94,27 +86,37 @@ public class TestStreamRouterBolt { } catch (IOException e) { throw new RuntimeException(e); } - if(count == 0) { + if (count == 0) { count++; } LOG.info(String.format("Collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple)); - if(!streamCollected.containsKey(streamId)){ - streamCollected.put(streamId,new ArrayList<>()); + if (!streamCollected.containsKey(streamId)) { + streamCollected.put(streamId, new ArrayList<>()); } streamCollected.get(streamId).add(event); orderCollected.add(event); return null; } + @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { } + public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { + } + @Override - public void ack(Tuple input) { } + public void ack(Tuple input) { + } + @Override - public void fail(Tuple input) { } + public void fail(Tuple input) { + } + @SuppressWarnings("unused") - public void resetTimeout(Tuple input) { } + public void resetTimeout(Tuple input) { + } + @Override - public void reportError(Throwable error) { } + public void reportError(Throwable error) { + } }); Map stormConf = new HashMap<>(); @@ -144,7 +146,7 @@ public class TestStreamRouterBolt { routerSpec.setStreamId(streamId); PolicyWorkerQueue queue = new PolicyWorkerQueue(); queue.setPartition(sp); - queue.setWorkers(Arrays.asList(new WorkSlot("testTopology","alertBolt1"), new WorkSlot("testTopology","alertBolt2"))); + queue.setWorkers(Arrays.asList(new WorkSlot("testTopology", "alertBolt1"), new WorkSlot("testTopology", "alertBolt2"))); routerSpec.setTargetQueue(Collections.singletonList(queue)); boltSpec.addRouterSpec(routerSpec); boltSpec.setVersion("version1"); @@ -178,8 +180,8 @@ public class TestStreamRouterBolt { // construct event with "value1" StreamEvent event = new StreamEvent(); - event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30")*1000); - Object[] data = new Object[]{"value1"}; + event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30") * 1000); + Object[] data = new Object[] {"value1"}; event.setData(data); event.setStreamId(streamId); PartitionedEvent pEvent = new PartitionedEvent(); @@ -190,8 +192,8 @@ public class TestStreamRouterBolt { // construct another event with "value2" event = new StreamEvent(); - event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10")*1000); - data = new Object[]{"value2"}; + event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10") * 1000); + data = new Object[] {"value2"}; event.setData(data); event.setStreamId(streamId); pEvent = new PartitionedEvent(); @@ -202,8 +204,8 @@ public class TestStreamRouterBolt { // construct another event with "value3" event = new StreamEvent(); - event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40")*1000); - data = new Object[]{"value3"}; + event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40") * 1000); + data = new Object[] {"value3"}; event.setData(data); event.setStreamId(streamId); pEvent = new PartitionedEvent(); @@ -214,8 +216,8 @@ public class TestStreamRouterBolt { // construct another event with "value4" event = new StreamEvent(); - event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10")*1000); - data = new Object[]{"value4"}; + event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10") * 1000); + data = new Object[] {"value4"}; event.setData(data); event.setStreamId(streamId); pEvent = new PartitionedEvent(); @@ -226,8 +228,8 @@ public class TestStreamRouterBolt { // construct another event with "value5", which will be thrown because two late event = new StreamEvent(); - event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10")*1000); - data = new Object[]{"value5"}; + event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10") * 1000); + data = new Object[] {"value5"}; event.setData(data); event.setStreamId(streamId); pEvent = new PartitionedEvent(); @@ -236,14 +238,14 @@ public class TestStreamRouterBolt { input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default"); routerBolt.execute(input); - Assert.assertEquals("Should finally collect two streams",2,streamCollected.size()); - Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt1",streamCollected.keySet().contains( - String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt1")))); - Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt2",streamCollected.keySet().contains( - String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt2")))); + Assert.assertEquals("Should finally collect two streams", 2, streamCollected.size()); + Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt1", streamCollected.keySet().contains( + String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt1")))); + Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt2", streamCollected.keySet().contains( + String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt2")))); - Assert.assertEquals("Should finally collect 3 events",3,orderCollected.size()); - Assert.assertArrayEquals("Should sort 3 events in ASC order",new String[]{"value2","value1","value3"},orderCollected.stream().map((d)->d.getData()[0]).toArray()); + Assert.assertEquals("Should finally collect 3 events", 3, orderCollected.size()); + Assert.assertArrayEquals("Should sort 3 events in ASC order", new String[] {"value2", "value1", "value3"}, orderCollected.stream().map((d) -> d.getData()[0]).toArray()); // The first 3 events are ticked automatically by window @@ -252,14 +254,14 @@ public class TestStreamRouterBolt { // Close will flush all events in memory, so will receive the last event which is still in memory as window is not expired according to clock // The 5th event will be thrown because too late and out of margin - Assert.assertEquals("Should finally collect two streams",2,streamCollected.size()); - Assert.assertEquals("Should finally collect 3 events",4,orderCollected.size()); - Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp",new String[]{"value2","value1","value3","value4"},orderCollected.stream().map((d)->d.getData()[0]).toArray()); + Assert.assertEquals("Should finally collect two streams", 2, streamCollected.size()); + Assert.assertEquals("Should finally collect 3 events", 4, orderCollected.size()); + Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp", new String[] {"value2", "value1", "value3", "value4"}, orderCollected.stream().map((d) -> d.getData()[0]).toArray()); } @SuppressWarnings("serial") - public static class MockChangeService extends AbstractMetadataChangeNotifyService{ + public static class MockChangeService extends AbstractMetadataChangeNotifyService { private final static Logger LOG = LoggerFactory.getLogger(MockChangeService.class); @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java index 13d1015..a3939cc 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java @@ -1,10 +1,5 @@ package org.apache.eagle.alert.engine.serialization; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - import org.apache.commons.lang3.SerializationUtils; import org.apache.eagle.alert.engine.coordinator.StreamColumn; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; @@ -17,6 +12,11 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -37,18 +37,18 @@ public class JavaSerializationTest { private final static Logger LOG = LoggerFactory.getLogger(JavaSerializationTest.class); @Test - public void testJavaSerialization(){ + public void testJavaSerialization() { PartitionedEvent partitionedEvent = new PartitionedEvent(); partitionedEvent.setPartitionKey(partitionedEvent.hashCode()); - partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream",Arrays.asList("name","host"))); + partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream", Arrays.asList("name", "host"))); StreamEvent event = new StreamEvent(); event.setStreamId("sampleStream"); event.setTimestamp(System.currentTimeMillis()); - event.setData(new Object[]{"CPU","LOCALHOST",true,Long.MAX_VALUE,60.0}); + event.setData(new Object[] {"CPU", "LOCALHOST", true, Long.MAX_VALUE, 60.0}); partitionedEvent.setEvent(event); int javaSerializationLength = SerializationUtils.serialize(partitionedEvent).length; - LOG.info("Java serialization length: {}, event: {}",javaSerializationLength,partitionedEvent); + LOG.info("Java serialization length: {}, event: {}", javaSerializationLength, partitionedEvent); int compactLength = 0; compactLength += "sampleStream".getBytes().length; @@ -60,17 +60,17 @@ public class JavaSerializationTest { compactLength += ByteUtils.longToBytes(Long.MAX_VALUE).length; compactLength += ByteUtils.doubleToBytes(60.0).length; - LOG.info("Compact serialization length: {}, event: {}",compactLength,partitionedEvent); + LOG.info("Compact serialization length: {}, event: {}", compactLength, partitionedEvent); Assert.assertTrue(compactLength * 20 < javaSerializationLength); } - public static StreamDefinition createSampleStreamDefinition(String streamId){ + public static StreamDefinition createSampleStreamDefinition(String streamId) { StreamDefinition sampleStreamDefinition = new StreamDefinition(); sampleStreamDefinition.setStreamId(streamId); sampleStreamDefinition.setTimeseries(true); sampleStreamDefinition.setValidate(true); - sampleStreamDefinition.setDescription("Schema for "+streamId); + sampleStreamDefinition.setDescription("Schema for " + streamId); List<StreamColumn> streamColumns = new ArrayList<>(); streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build()); @@ -82,7 +82,7 @@ public class JavaSerializationTest { return sampleStreamDefinition; } - public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField){ + public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField) { StreamPartition streamPartition = new StreamPartition(); streamPartition.setStreamId(streamId); streamPartition.setColumns(groupByField); @@ -91,22 +91,22 @@ public class JavaSerializationTest { } @SuppressWarnings("serial") - public static PartitionedEvent createSimpleStreamEvent() { - StreamEvent event = StreamEvent.Builder() - .schema(createSampleStreamDefinition("sampleStream_1")) - .streamId("sampleStream_1") - .timestamep(System.currentTimeMillis()) - .attributes(new HashMap<String,Object>(){{ - put("name","cpu"); - put("host","localhost"); - put("flag",true); - put("value",60.0); - put("data",Long.MAX_VALUE); - put("unknown","unknown column value"); - }}).build(); + public static PartitionedEvent createSimpleStreamEvent() { + StreamEvent event = StreamEvent.builder() + .schema(createSampleStreamDefinition("sampleStream_1")) + .streamId("sampleStream_1") + .timestamep(System.currentTimeMillis()) + .attributes(new HashMap<String, Object>() {{ + put("name", "cpu"); + put("host", "localhost"); + put("flag", true); + put("value", 60.0); + put("data", Long.MAX_VALUE); + put("unknown", "unknown column value"); + }}).build(); PartitionedEvent pEvent = new PartitionedEvent(); pEvent.setEvent(event); - pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name","host"))); + pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name", "host"))); return pEvent; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java index a756ebe..4241c3c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java @@ -16,9 +16,14 @@ */ package org.apache.eagle.alert.engine.serialization; -import java.io.IOException; -import java.util.BitSet; - +import backtype.storm.serialization.DefaultKryoFactory; +import backtype.storm.serialization.DefaultSerializationDelegate; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; import org.apache.commons.lang.time.StopWatch; import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory; import org.apache.eagle.alert.engine.model.PartitionedEvent; @@ -30,66 +35,63 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.serialization.DefaultKryoFactory; -import backtype.storm.serialization.DefaultSerializationDelegate; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.common.io.ByteArrayDataInput; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; +import java.io.IOException; +import java.util.BitSet; public class PartitionedEventSerializerTest { private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class); + @SuppressWarnings("deprecation") @Test public void testPartitionEventSerialization() throws IOException { - PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());; + PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream", System.currentTimeMillis()); + ; PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition); ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput(); - serializer.serialize(partitionedEvent,dataOutput1); + serializer.serialize(partitionedEvent, dataOutput1); byte[] serializedBytes = dataOutput1.toByteArray(); PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes)); - Assert.assertEquals(partitionedEvent,deserializedEvent); + Assert.assertEquals(partitionedEvent, deserializedEvent); - PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true); + PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition, true); byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent); PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed); - Assert.assertEquals(partitionedEvent,deserializedEventCompressed); + Assert.assertEquals(partitionedEvent, deserializedEventCompressed); PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition); ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput(); - serializer2.serialize(partitionedEvent,dataOutput2); + serializer2.serialize(partitionedEvent, dataOutput2); byte[] serializedBytes2 = dataOutput2.toByteArray(); ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2); PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2); - Assert.assertEquals(partitionedEvent,deserializedEvent2); + Assert.assertEquals(partitionedEvent, deserializedEvent2); byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent); Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault(); Output output = new Output(10000); - kryo.writeClassAndObject(output,partitionedEvent); + kryo.writeClassAndObject(output, partitionedEvent); byte[] kryoBytes = output.toBytes(); Input input = new Input(kryoBytes); PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input); - Assert.assertEquals(partitionedEvent,kryoDeserializedEvent); - LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}",serializedBytes.length,serializedBytesCompressed.length,serializedBytes2.length,javaSerialization.length,kryoBytes.length,kryoSerialize(serializedBytes).length,kryoSerialize(serializedBytes2).length); + Assert.assertEquals(partitionedEvent, kryoDeserializedEvent); + LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}", serializedBytes.length, serializedBytesCompressed.length, serializedBytes2.length, javaSerialization.length, kryoBytes.length, kryoSerialize(serializedBytes).length, kryoSerialize(serializedBytes2).length); } + @SuppressWarnings("deprecation") @Test public void testPartitionEventSerializationEfficiency() throws IOException { - PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());; + PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream", System.currentTimeMillis()); + ; PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition); int count = 100000; StopWatch stopWatch = new StopWatch(); stopWatch.start(); int i = 0; - while(i<count) { + while (i < count) { ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput(); serializer.serialize(partitionedEvent, dataOutput1); byte[] serializedBytes = dataOutput1.toByteArray(); @@ -98,24 +100,24 @@ public class PartitionedEventSerializerTest { i++; } stopWatch.stop(); - LOG.info("Cached Stream: {} ms",stopWatch.getTime()); + LOG.info("Cached Stream: {} ms", stopWatch.getTime()); stopWatch.reset(); - PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true); + PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition, true); i = 0; stopWatch.start(); - while(i<count) { + while (i < count) { byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent); PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed); Assert.assertEquals(partitionedEvent, deserializedEventCompressed); i++; } stopWatch.stop(); - LOG.info("Compressed Cached Stream: {} ms",stopWatch.getTime()); + LOG.info("Compressed Cached Stream: {} ms", stopWatch.getTime()); stopWatch.reset(); i = 0; stopWatch.start(); - while(i<count) { + while (i < count) { PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition); ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput(); serializer2.serialize(partitionedEvent, dataOutput2); @@ -126,23 +128,23 @@ public class PartitionedEventSerializerTest { i++; } stopWatch.stop(); - LOG.info("Cached Stream&Partition: {} ms",stopWatch.getTime()); + LOG.info("Cached Stream&Partition: {} ms", stopWatch.getTime()); stopWatch.reset(); i = 0; stopWatch.start(); - while(i<count) { + while (i < count) { byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent); PartitionedEvent javaSerializedEvent = (PartitionedEvent) new DefaultSerializationDelegate().deserialize(javaSerialization); Assert.assertEquals(partitionedEvent, javaSerializedEvent); i++; } stopWatch.stop(); - LOG.info("Java Native: {} ms",stopWatch.getTime()); + LOG.info("Java Native: {} ms", stopWatch.getTime()); stopWatch.reset(); i = 0; stopWatch.start(); Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault(); - while(i<count) { + while (i < count) { Output output = new Output(10000); kryo.writeClassAndObject(output, partitionedEvent); byte[] kryoBytes = output.toBytes(); @@ -152,73 +154,73 @@ public class PartitionedEventSerializerTest { i++; } stopWatch.stop(); - LOG.info("Kryo: {} ms",stopWatch.getTime()); + LOG.info("Kryo: {} ms", stopWatch.getTime()); } /** * Kryo Serialization Length = Length of byte[] + 2 */ @Test - public void testKryoByteArraySerialization(){ + public void testKryoByteArraySerialization() { Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault(); - byte[] bytes = new byte[]{0,1,2,3,4,5,6,7,8,9}; + byte[] bytes = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; Output output = new Output(1000); - kryo.writeObject(output,bytes); - Assert.assertEquals(bytes.length + 2,output.toBytes().length); + kryo.writeObject(output, bytes); + Assert.assertEquals(bytes.length + 2, output.toBytes().length); } - private byte[] kryoSerialize(Object object){ + private byte[] kryoSerialize(Object object) { Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault(); Output output = new Output(100000); - kryo.writeClassAndObject(output,object); + kryo.writeClassAndObject(output, object); return output.toBytes(); } @Test - public void testBitSet(){ + public void testBitSet() { BitSet bitSet = new BitSet(); - bitSet.set(0,true); // 1 - bitSet.set(1,false); // 0 - bitSet.set(2,true); // 1 - LOG.info("Bit Set Size: {}",bitSet.size()); - LOG.info("Bit Set Byte[]: {}",bitSet.toByteArray()); - LOG.info("Bit Set Byte[]: {}",bitSet.toLongArray()); - LOG.info("BitSet[0]: {}",bitSet.get(0)); - LOG.info("BitSet[1]: {}",bitSet.get(1)); - LOG.info("BitSet[1]: {}",bitSet.get(2)); + bitSet.set(0, true); // 1 + bitSet.set(1, false); // 0 + bitSet.set(2, true); // 1 + LOG.info("Bit Set Size: {}", bitSet.size()); + LOG.info("Bit Set Byte[]: {}", bitSet.toByteArray()); + LOG.info("Bit Set Byte[]: {}", bitSet.toLongArray()); + LOG.info("BitSet[0]: {}", bitSet.get(0)); + LOG.info("BitSet[1]: {}", bitSet.get(1)); + LOG.info("BitSet[1]: {}", bitSet.get(2)); byte[] bytes = bitSet.toByteArray(); BitSet bitSet2 = BitSet.valueOf(bytes); - LOG.info("Bit Set Size: {}",bitSet2.size()); - LOG.info("Bit Set Byte[]: {}",bitSet2.toByteArray()); - LOG.info("Bit Set Byte[]: {}",bitSet2.toLongArray()); - LOG.info("BitSet[0]: {}",bitSet2.get(0)); - LOG.info("BitSet[1]: {}",bitSet2.get(1)); - LOG.info("BitSet[1]: {}",bitSet2.get(2)); + LOG.info("Bit Set Size: {}", bitSet2.size()); + LOG.info("Bit Set Byte[]: {}", bitSet2.toByteArray()); + LOG.info("Bit Set Byte[]: {}", bitSet2.toLongArray()); + LOG.info("BitSet[0]: {}", bitSet2.get(0)); + LOG.info("BitSet[1]: {}", bitSet2.get(1)); + LOG.info("BitSet[1]: {}", bitSet2.get(2)); BitSet bitSet3 = new BitSet(); - bitSet3.set(0,true); - Assert.assertEquals(1,bitSet3.length()); + bitSet3.set(0, true); + Assert.assertEquals(1, bitSet3.length()); BitSet bitSet4 = new BitSet(); - bitSet4.set(0,false); - Assert.assertEquals(0,bitSet4.length()); + bitSet4.set(0, false); + Assert.assertEquals(0, bitSet4.length()); Assert.assertFalse(bitSet4.get(1)); Assert.assertFalse(bitSet4.get(2)); } @Test - public void testPeriod(){ - Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.parse("PT30m"))); - Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.millis(30*60*1000))); - Assert.assertEquals("PT1800S", Period.millis(30*60*1000).toString()); + public void testPeriod() { + Assert.assertEquals(30 * 60 * 1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.parse("PT30m"))); + Assert.assertEquals(30 * 60 * 1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.millis(30 * 60 * 1000))); + Assert.assertEquals("PT1800S", Period.millis(30 * 60 * 1000).toString()); } @Test - public void testPartitionType(){ + public void testPartitionType() { } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java index 3d373b6..9520b62 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java @@ -32,32 +32,31 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * @since Jun 21, 2016 - * */ public class SiddhiPolicyTest { - + private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyTest.class); private String streams = " define stream syslog_stream(" - + "dims_facility string, " - + "dims_severity string, " - + "dims_hostname string, " - + "dims_msgid string, " - + "timestamp string, " - + "conn string, " - + "op string, " - + "msgId string, " - + "command string, " - + "name string, " - + "namespace string, " - + "epochMillis long); "; + + "dims_facility string, " + + "dims_severity string, " + + "dims_hostname string, " + + "dims_msgid string, " + + "timestamp string, " + + "conn string, " + + "op string, " + + "msgId string, " + + "command string, " + + "name string, " + + "namespace string, " + + "epochMillis long); "; private SiddhiManager sm; - + @Before public void setup() { sm = new SiddhiManager(); } - + @After public void shutdown() { sm.shutdown(); @@ -70,7 +69,9 @@ public class SiddhiPolicyTest { @Override public void receive(Event[] arg0) { - }; + } + + ; }; String executionPlan = streams + ql; @@ -83,13 +84,13 @@ public class SiddhiPolicyTest { @Test public void testPolicy_agg() throws Exception { String sql = " from syslog_stream#window.time(1min) select " - + "name, " - + "namespace, " - + "timestamp, " - + "dims_hostname, " - + "count(*) as abortCount " - + "group by dims_hostname " - + "having abortCount > 3 insert into syslog_severity_check_output; "; + + "name, " + + "namespace, " + + "timestamp, " + + "dims_hostname, " + + "count(*) as abortCount " + + "group by dims_hostname " + + "having abortCount > 3 insert into syslog_severity_check_output; "; final AtomicBoolean checked = new AtomicBoolean(false); StreamCallback sc = new StreamCallback() { @@ -107,7 +108,9 @@ public class SiddhiPolicyTest { Assert.assertTrue(hosts.contains("HOSTNAME-" + 1)); Assert.assertTrue(hosts.contains("HOSTNAME-" + 2)); Assert.assertFalse(hosts.contains("HOSTNAME-" + 3)); - }; + } + + ; }; String executionPlan = streams + sql; @@ -124,7 +127,7 @@ public class SiddhiPolicyTest { runtime.shutdown(); } - + /* + "dims_facility string, " + "dims_severity string, " @@ -145,8 +148,8 @@ public class SiddhiPolicyTest { for (int i = 0; i < length; i++) { Event e = new Event(12); e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + i%4 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); - + e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + i % 4, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); + events[i] = e; } @@ -156,7 +159,7 @@ public class SiddhiPolicyTest { Event e = new Event(12); e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11 , "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); + e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); handler.send(e); } @@ -164,28 +167,30 @@ public class SiddhiPolicyTest { @Test public void testPolicy_regex() throws Exception { String sql = " from syslog_stream[regex:find(\"Abort\", op)]#window.time(1min) select timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; "; - + AtomicBoolean checked = new AtomicBoolean(); StreamCallback sc = new StreamCallback() { @Override public void receive(Event[] arg0) { checked.set(true); - }; + } + + ; }; String executionPlan = streams + sql; ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan); - runtime.addCallback("syslog_severity_check_output", sc); + runtime.addCallback("syslog_severity_check_output", sc); runtime.start(); - + InputHandler handler = runtime.getInputHandler("syslog_stream"); - + sendInput(handler); - + Thread.sleep(1000); - + Assert.assertTrue(checked.get()); - + runtime.shutdown(); } @@ -193,17 +198,19 @@ public class SiddhiPolicyTest { @Test public void testPolicy_seq() throws Exception { String sql = "" - + " from every e1=syslog_stream[regex:find(\"UPDOWN\", op)] -> " - + " e2=syslog_stream[dims_hostname == e1.dims_hostname and regex:find(\"Abort\", op)] within 1 min " - + " select e1.timestamp as timestamp, e1.op as a_op, e2.op as b_op " - + " insert into syslog_severity_check_output; "; + + " from every e1=syslog_stream[regex:find(\"UPDOWN\", op)] -> " + + " e2=syslog_stream[dims_hostname == e1.dims_hostname and regex:find(\"Abort\", op)] within 1 min " + + " select e1.timestamp as timestamp, e1.op as a_op, e2.op as b_op " + + " insert into syslog_severity_check_output; "; AtomicBoolean checked = new AtomicBoolean(); StreamCallback sc = new StreamCallback() { @Override public void receive(Event[] arg0) { checked.set(true); - }; + } + + ; }; String executionPlan = streams + sql; @@ -224,21 +231,21 @@ public class SiddhiPolicyTest { // validate one Event e = new Event(12); e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-UPDOWN", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); - + e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-UPDOWN", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); + e = new Event(12); e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-nothing", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); - + e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-nothing", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); + e = new Event(12); e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); + e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); Thread.sleep(61 * 1000); e = new Event(12); e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11 , "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); + e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); handler.send(e); } @@ -246,7 +253,7 @@ public class SiddhiPolicyTest { @Test public void testStrConcat() throws Exception { String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " + - " from log select timestamp, str:concat(switchLabel, '===', port) as alertKey, message insert into output; "; + " from log select timestamp, str:concat(switchLabel, '===', port) as alertKey, message insert into output; "; SiddhiManager manager = new SiddhiManager(); ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql); runtime.addCallback("output", new StreamCallback() { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java index f2f3b46..7694623 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java @@ -18,10 +18,6 @@ */ package org.apache.eagle.alert.engine.siddhi.extension; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; - import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,9 +27,12 @@ import org.wso2.siddhi.core.event.Event; import org.wso2.siddhi.core.stream.input.InputHandler; import org.wso2.siddhi.core.stream.output.StreamCallback; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + /** * @since Apr 1, 2016 - * */ public class AttributeCollectAggregatorTest { @@ -85,59 +84,59 @@ public class AttributeCollectAggregatorTest { Event e = null; long base = System.currentTimeMillis(); { - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"}); base += 100; events.add(e); - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"}); base += 100; events.add(e); - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"}); base += 100; events.add(e); } { - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"}); base += 100; events.add(e); - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"}); base += 100; events.add(e); - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"}); base += 100; events.add(e); } base += 10000; { - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"}); base += 100; events.add(e); - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"}); base += 100; events.add(e); - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"}); base += 100; events.add(e); } { - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"}); base += 100; events.add(e); - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"}); base += 100; events.add(e); - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"}); base += 100; events.add(e); } base += 10000; - e = new Event(base, new Object[] { base, "host" + r.nextInt(), "mq" }); + e = new Event(base, new Object[] {base, "host" + r.nextInt(), "mq"}); return events.toArray(new Event[0]); } - + @Test public void testQuery() { String ql = "define stream perfmon_input_stream_cpu ( host string,timestamp long,metric string,pool string,value double,colo string );";