http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java deleted file mode 100644 index 533e486..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.e2e; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Properties; - -/** - * Since 6/29/16. - */ -public class SampleClient5AbsenceAlert { - private static final Logger LOG = LoggerFactory.getLogger(SampleClient5AbsenceAlert.class); - private static long currentTimestamp = 1467240000000L; - private static long interval = 3000L; - - public static void main(String[] args) throws Exception { - System.setProperty("config.resource", "/absence/application-absence.conf"); - ConfigFactory.invalidateCaches(); - - Config config = ConfigFactory.load(); - KafkaProducer producer = createProducer(config); - ProducerRecord record = null; - record = new ProducerRecord("absenceAlertTopic", createEvent("job1")); - producer.send(record); - record = new ProducerRecord("absenceAlertTopic", createEvent("job2")); - producer.send(record); - record = new ProducerRecord("absenceAlertTopic", createEvent("host3")); - producer.send(record); - } - - private static class AbsenceEvent { - @JsonProperty - long timestamp; - @JsonProperty - String jobID; - @JsonProperty - String status; - - public String toString() { - return "timestamp=" + timestamp + ",jobID=" + jobID + ",status=" + status; - } - } - - private static String createEvent(String jobID) throws Exception { - AbsenceEvent e = new AbsenceEvent(); - long expectTS = currentTimestamp + interval; - // adjust back 1 second random - long adjust = Math.round(2 * Math.random()); - e.timestamp = expectTS - adjust; - e.jobID = jobID; - e.status = "running"; - LOG.info("sending event {} ", e); - ObjectMapper mapper = new ObjectMapper(); - String value = mapper.writeValueAsString(e); - return value; - } - - - public static KafkaProducer<String, String> createProducer(Config config) { - String servers = config.getString("kafkaProducer.bootstrapServers"); - Properties configMap = new Properties(); - configMap.put("bootstrap.servers", servers); - configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - configMap.put("request.required.acks", "1"); - configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - KafkaProducer<String, String> proceduer = new KafkaProducer<String, String>(configMap); - return proceduer; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java deleted file mode 100755 index 4552417..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator; - -import backtype.storm.task.IOutputCollector; -import backtype.storm.task.OutputCollector; -import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorThreadSafeWrapper; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.router.impl.StormOutputCollector; -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class AlertBoltOutputCollectorThreadSafeWrapperTest { - @Test - public void testThreadSafeAlertBoltOutputCollector() { - MockedStormAlertOutputCollector stormOutputCollector = new MockedStormAlertOutputCollector(null); - AlertBoltOutputCollectorThreadSafeWrapper alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorThreadSafeWrapper(new StormOutputCollector(stormOutputCollector)); - alertBoltOutputCollectorWrapper.emit(create("mockAlert_1")); - alertBoltOutputCollectorWrapper.emit(create("mockAlert_2")); - Assert.assertEquals(0, stormOutputCollector.getCollected().size()); - Assert.assertEquals(0, stormOutputCollector.getTupleSize()); - alertBoltOutputCollectorWrapper.flush(); - Assert.assertEquals(2, stormOutputCollector.getCollected().size()); - Assert.assertEquals(2, stormOutputCollector.getTupleSize()); - alertBoltOutputCollectorWrapper.emit(create("mockAlert_3")); - Assert.assertEquals(2, stormOutputCollector.getCollected().size()); - Assert.assertEquals(2, stormOutputCollector.getTupleSize()); - alertBoltOutputCollectorWrapper.flush(); - alertBoltOutputCollectorWrapper.flush(); - alertBoltOutputCollectorWrapper.flush(); - Assert.assertEquals(3, stormOutputCollector.getCollected().size()); - Assert.assertEquals(3, stormOutputCollector.getTupleSize()); - } - - private AlertStreamEvent create(String streamId) { - AlertStreamEvent alert = new AlertStreamEvent(); - alert.setCreatedBy(this.toString()); - alert.setCreatedTime(System.currentTimeMillis()); - alert.setData(new Object[] {"field_1", 2, "field_3"}); - alert.setStreamId(streamId); - return alert; - } - - private class MockedStormAlertOutputCollector extends OutputCollector { - private final Map<Object, List<Object>> collected; - - MockedStormAlertOutputCollector(IOutputCollector delegate) { - super(delegate); - collected = new HashMap<>(); - } - - @Override - public List<Integer> emit(String streamId, List<Object> tuple) { - if (!collected.containsKey(tuple.get(0))) { - collected.put(tuple.get(0), new LinkedList<>()); - } - collected.get(tuple.get(0)).add(tuple); - return null; - } - - Map<Object, List<Object>> getCollected() { - return collected; - } - - int getTupleSize() { - int size = 0; - for (List<Object> alerts : collected.values()) { - size += alerts.size(); - } - return size; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java deleted file mode 100644 index 1e65edd..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.junit.Assert; -import org.junit.Test; - -import java.util.List; -import java.util.Map; - -/** - * Created on 9/7/16. - */ -public class PoilcyExtendedTest { - - private static final ObjectMapper mapper = new ObjectMapper(); - - @Test - public void test() throws Exception { - ArrayNode arrayNode = (ArrayNode) - mapper.readTree(PoilcyExtendedTest.class.getResourceAsStream("/extend_policy.json")); - Assert.assertEquals(1, arrayNode.size()); - for (JsonNode node : arrayNode) { - PolicyDefinition definition = mapper.treeToValue(node, PolicyDefinition.class); - - Assert.assertNotNull(definition); - Assert.assertNotNull(definition.getName()); - Assert.assertNotNull(definition.getDefinition()); - - Assert.assertEquals(PolicyStreamHandlers.CUSTOMIZED_ENGINE, definition.getDefinition().getType()); - Assert.assertNotNull(definition.getDefinition().getProperties()); - - Assert.assertTrue(definition.getDefinition().getProperties().containsKey("parentKey")); - Map pkSetting = (Map) definition.getDefinition().getProperties().get("parentKey"); - Assert.assertTrue(pkSetting.containsKey("syslogStream")); - - Map syslogStreamSetting = (Map) pkSetting.get("syslogStream"); - Assert.assertTrue(syslogStreamSetting.containsKey("pattern")); - Assert.assertEquals("%s-%s", syslogStreamSetting.get("pattern")); - - Assert.assertTrue(syslogStreamSetting.containsKey("columns")); - Assert.assertEquals(3, ((List) syslogStreamSetting.get("columns")).size()); - - break; - } - - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java deleted file mode 100755 index 89039f5..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator; - -import backtype.storm.metric.api.MultiCountMetric; -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.StormMultiCountMetric; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl; -import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler; -import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory; -import org.apache.eagle.alert.engine.mock.MockStreamCollector; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -public class SiddhiCEPPolicyEventHandlerTest { - private final static Logger LOG = LoggerFactory.getLogger(SiddhiCEPPolicyEventHandlerTest.class); - - private Map<String, StreamDefinition> createDefinition(String... streamIds) { - Map<String, StreamDefinition> sds = new HashMap<>(); - for (String streamId : streamIds) { - // construct StreamDefinition - StreamDefinition sd = MockSampleMetadataFactory.createSampleStreamDefinition(streamId); - sds.put(streamId, sd); - } - return sds; - } - - @SuppressWarnings("serial") - @Test - public void testBySendSimpleEvent() throws Exception { - SiddhiPolicyHandler handler; - MockStreamCollector collector; - - handler = new SiddhiPolicyHandler(createDefinition("sampleStream_1", "sampleStream_2"), 0); - collector = new MockStreamCollector(); - PolicyDefinition policyDefinition = MockSampleMetadataFactory.createSingleMetricSamplePolicy(); - PolicyHandlerContext context = new PolicyHandlerContext(); - context.setPolicyDefinition(policyDefinition); - context.setPolicyCounter(new StormMultiCountMetric(new MultiCountMetric())); - context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId")); - handler.prepare(collector, context); - StreamEvent event = StreamEvent.builder() - .schema(MockSampleMetadataFactory.createSampleStreamDefinition("sampleStream_1")) - .streamId("sampleStream_1") - .timestamep(System.currentTimeMillis()) - .attributes(new HashMap<String, Object>() {{ - put("name", "cpu"); - put("value", 60.0); - put("bad", "bad column value"); - }}).build(); - handler.send(event); - handler.close(); - } - - @SuppressWarnings("serial") - @Test - public void testWithTwoStreamJoinPolicy() throws Exception { - Map<String, StreamDefinition> ssd = createDefinition("sampleStream_1", "sampleStream_2"); - - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("SampleJoinPolicyForTest"); - policyDefinition.setInputStreams(Arrays.asList("sampleStream_1", "sampleStream_2")); - policyDefinition.setOutputStreams(Collections.singletonList("joinedStream")); - policyDefinition.setDefinition(new PolicyDefinition.Definition(PolicyStreamHandlers.SIDDHI_ENGINE, - "from sampleStream_1#window.length(10) as left " + - "join sampleStream_2#window.length(10) as right " + - "on left.name == right.name and left.value == right.value " + - "select left.timestamp,left.name,left.value " + - "insert into joinedStream")); - policyDefinition.setPartitionSpec(Collections.singletonList(MockSampleMetadataFactory.createSampleStreamGroupbyPartition("sampleStream_1", Collections.singletonList("name")))); - SiddhiPolicyHandler handler; - Semaphore mutex = new Semaphore(0); - List<AlertStreamEvent> alerts = new ArrayList<>(0); - Collector<AlertStreamEvent> collector = (event) -> { - LOG.info("Collected {}", event); - Assert.assertTrue(event != null); - alerts.add(event); - mutex.release(); - }; - - handler = new SiddhiPolicyHandler(ssd, 0); - PolicyHandlerContext context = new PolicyHandlerContext(); - context.setPolicyDefinition(policyDefinition); - context.setPolicyCounter(new StormMultiCountMetric(new MultiCountMetric())); - context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId")); - handler.prepare(collector, context); - - - long ts_1 = System.currentTimeMillis(); - long ts_2 = System.currentTimeMillis() + 1; - - handler.send(StreamEvent.builder() - .schema(ssd.get("sampleStream_1")) - .streamId("sampleStream_1") - .timestamep(ts_1) - .attributes(new HashMap<String, Object>() {{ - put("name", "cpu"); - put("value", 60.0); - put("bad", "bad column value"); - }}).build()); - - handler.send(StreamEvent.builder() - .schema(ssd.get("sampleStream_2")) - .streamId("sampleStream_2") - .timestamep(ts_2) - .attributes(new HashMap<String, Object>() {{ - put("name", "cpu"); - put("value", 61.0); - }}).build()); - - handler.send(StreamEvent.builder() - .schema(ssd.get("sampleStream_2")) - .streamId("sampleStream_2") - .timestamep(ts_2) - .attributes(new HashMap<String, Object>() {{ - put("name", "disk"); - put("value", 60.0); - }}).build()); - - handler.send(StreamEvent.builder() - .schema(ssd.get("sampleStream_2")) - .streamId("sampleStream_2") - .timestamep(ts_2) - .attributes(new HashMap<String, Object>() {{ - put("name", "cpu"); - put("value", 60.0); - }}).build()); - - handler.close(); - - Assert.assertTrue("Should get result in 5 s", mutex.tryAcquire(5, TimeUnit.SECONDS)); - Assert.assertEquals(1, alerts.size()); - Assert.assertEquals("joinedStream", alerts.get(0).getStreamId()); - Assert.assertEquals("cpu", alerts.get(0).getData()[1]); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java deleted file mode 100644 index 9febed5..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import org.apache.eagle.alert.engine.StreamContext; -import org.apache.eagle.alert.engine.StreamCounter; -import org.apache.eagle.alert.engine.coordinator.PublishPartition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.router.StreamOutputCollector; -import org.junit.Before; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Set; - -import static org.mockito.Mockito.*; - -public class AlertBoltOutputCollectorWrapperTest { - - private AlertBoltOutputCollectorWrapper alertBoltOutputCollectorWrapper; - - // mock objects - private StreamOutputCollector outputCollector; - private Object outputLock; - private StreamContext streamContext; - private StreamCounter streamCounter; - - private Set<PublishPartition> publishPartitions = new HashSet<>(); - - private static final String samplePublishId = "samplePublishId"; - private static final String samplePublishId2 = "samplePublishId2"; - private static final String samplePolicyId = "samplePolicyId"; - private static final String sampleStreamId = "sampleStreamId"; - private static final String sampleStreamId2 = "sampleStreamId2"; - - @Before - public void setUp() throws Exception { - outputCollector = mock(StreamOutputCollector.class); - outputLock = mock(Object.class); - streamContext = mock(StreamContext.class); - streamCounter = mock(StreamCounter.class); - alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorWrapper(outputCollector, outputLock, streamContext); - } - - @Before - public void tearDown() throws Exception { - alertBoltOutputCollectorWrapper.onAlertBoltSpecChange(new HashSet<>(), publishPartitions, new HashSet<>()); - publishPartitions.clear(); - } - - @Test - public void testNormal() throws Exception { - doReturn(streamCounter).when(streamContext).counter(); - - publishPartitions.add(createPublishPartition(samplePublishId, samplePolicyId, sampleStreamId)); - publishPartitions.add(createPublishPartition(samplePublishId2, samplePolicyId, sampleStreamId2)); - alertBoltOutputCollectorWrapper.onAlertBoltSpecChange(publishPartitions, new HashSet<>(), new HashSet<>()); - - AlertStreamEvent event = new AlertStreamEvent(); - event.setPolicyId(samplePolicyId); - StreamDefinition sd = new StreamDefinition(); - sd.setStreamId(sampleStreamId); - sd.setColumns(new ArrayList<>()); - event.setSchema(sd); - - alertBoltOutputCollectorWrapper.emit(event); - - verify(streamCounter, times(1)).incr(anyString()); - verify(outputCollector, times(1)).emit(anyObject()); - } - - @Test - public void testExceptional() throws Exception { - doReturn(streamCounter).when(streamContext).counter(); - - publishPartitions.add(createPublishPartition(samplePublishId, samplePolicyId, sampleStreamId)); - publishPartitions.add(createPublishPartition(samplePublishId, samplePolicyId, sampleStreamId)); - alertBoltOutputCollectorWrapper.onAlertBoltSpecChange(publishPartitions, new HashSet<>(), new HashSet<>()); - - AlertStreamEvent event = new AlertStreamEvent(); - event.setPolicyId(samplePolicyId); - StreamDefinition sd = new StreamDefinition(); - sd.setStreamId(sampleStreamId); - sd.setColumns(new ArrayList<>()); - event.setSchema(sd); - - alertBoltOutputCollectorWrapper.emit(event); - - verify(streamCounter, times(1)).incr(anyString()); - verify(outputCollector, times(1)).emit(anyObject()); - } - - private PublishPartition createPublishPartition(String publishId, String policyId, String streamId) { - PublishPartition publishPartition = new PublishPartition(); - publishPartition.setPolicyId(policyId); - publishPartition.setStreamId(streamId); - publishPartition.setPublishId(publishId); - publishPartition.setColumns(new HashSet<>()); - return publishPartition; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java deleted file mode 100644 index 2d3ee85..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.integration; - -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamingCluster; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.service.IMetadataServiceClient; - -import java.io.IOException; -import java.util.List; - -@SuppressWarnings("serial") -public class MockMetadataServiceClient implements IMetadataServiceClient { - - @Override - public List<SpoutSpec> listSpoutMetadata() { - return null; - } - - @Override - public void close() throws IOException { - - } - - @Override - public ScheduleState getVersionedSpec(String version) { - return null; - } - - @Override - public List<StreamingCluster> listClusters() { - return null; - } - - @Override - public List<PolicyDefinition> listPolicies() { - return null; - } - - @Override - public List<StreamDefinition> listStreams() { - return null; - } - - @Override - public List<Kafka2TupleMetadata> listDataSources() { - return null; - } - - @Override - public List<Publishment> listPublishment() { - return null; - } - - @Override - public ScheduleState getVersionedSpec() { - return null; - } - - @Override - public void addScheduleState(ScheduleState state) { - - } - - @Override - public List<Topology> listTopologies() { - return null; - } - - @Override - public void addStreamingCluster(StreamingCluster cluster) { - - } - - @Override - public void addStreamingClusters(List<StreamingCluster> clusters) { - - } - - @Override - public void addTopology(Topology t) { - - } - - @Override - public void addTopologies(List<Topology> topologies) { - - } - - @Override - public void addPolicy(PolicyDefinition policy) { - - } - - @Override - public void addPolicies(List<PolicyDefinition> policies) { - - } - - @Override - public void addStreamDefinition(StreamDefinition streamDef) { - - } - - @Override - public void addStreamDefinitions(List<StreamDefinition> streamDefs) { - - } - - @Override - public void addDataSource(Kafka2TupleMetadata k2t) { - - } - - @Override - public void addDataSources(List<Kafka2TupleMetadata> k2ts) { - - } - - @Override - public void addPublishment(Publishment pub) { - - } - - @Override - public void addPublishments(List<Publishment> pubs) { - - } - - @Override - public void clear() { - - } - - @Override - public void clearScheduleState(int maxCapacity) { - - } - - @Override - public List<AlertPublishEvent> listAlertPublishEvent() { - return null; - } - - @Override - public void addAlertPublishEvent(AlertPublishEvent event) { - - } - - @Override - public void addAlertPublishEvents(List<AlertPublishEvent> events) { - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java deleted file mode 100644 index 4047fc1..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java +++ /dev/null @@ -1,558 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.interpreter; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers; -import org.junit.Assert; -import org.junit.Test; -import org.wso2.siddhi.core.exception.DefinitionNotExistException; -import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; - -import java.util.*; - -public class PolicyInterpreterTest { - // ------------------------- - // Single Stream Test Cases - // ------------------------- - @Test - public void testParseSingleStreamPolicyQuery() throws Exception { - PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) " - + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT"); - Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]); - Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]); - Assert.assertEquals(1, executionPlan.getStreamPartitions().size()); - Assert.assertEquals(2*60*1000,executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); - } - - @Test - public void testParseSingleStreamPolicyWithPattern() throws Exception { - PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( - "from e1=Stream1[price >= 20] -> e2=Stream2[price >= e1.price] \n" - + "select e1.symbol as symbol, e2.price as price, e1.price+e2.price as total_price \n" - + "group by symbol, company insert into OutStream"); - Assert.assertTrue(executionPlan.getInputStreams().containsKey("Stream1")); - Assert.assertTrue(executionPlan.getInputStreams().containsKey("Stream2")); - Assert.assertTrue(executionPlan.getOutputStreams().containsKey("OutStream")); - Assert.assertEquals(StreamPartition.Type.GROUPBY,executionPlan.getStreamPartitions().get(0).getType()); - Assert.assertArrayEquals(new String[]{"symbol","company"},executionPlan.getStreamPartitions().get(0).getColumns().toArray()); - Assert.assertEquals(StreamPartition.Type.GROUPBY,executionPlan.getStreamPartitions().get(1).getType()); - Assert.assertArrayEquals(new String[]{"symbol","company"},executionPlan.getStreamPartitions().get(1).getColumns().toArray()); - } - - @Test - public void testParseSingleStreamPolicyQueryWithMultiplePartitionUsingLargerWindow() throws Exception { - PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( - "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 1 min) " - + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;" - + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 1 hour) " - + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;" - ); - Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]); - Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1")); - Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2")); - Assert.assertEquals(1, executionPlan.getStreamPartitions().size()); - Assert.assertEquals(60*60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); - } - - @Test(expected = ExecutionPlanValidationException.class) - public void testParseSingleStreamPolicyQueryWithConflictPartition() throws Exception { - PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( - "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 5 min) " - + "select cmd, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;" - + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) " - + "select user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;" - ); - Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]); - Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1")); - Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2")); - Assert.assertEquals(2, executionPlan.getStreamPartitions().size()); - Assert.assertEquals(5*60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); - } - - @Test - public void testValidPolicyWithExternalTimeWindow() { - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("test_policy"); - policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1")); - policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1")); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType("siddhi"); - definition.setValue("from INPUT_STREAM_1#window.externalTime(timestamp, 2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;"); - definition.setInputStreams(policyDefinition.getInputStreams()); - definition.setOutputStreams(policyDefinition.getOutputStreams()); - policyDefinition.setDefinition(definition); - - PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() { - { - put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1")); - put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2")); - put("INPUT_STREAM_3", mockStreamDefinition("INPUT_STREAM_3")); - put("INPUT_STREAM_4", mockStreamDefinition("INPUT_STREAM_4")); - } - }); - Assert.assertTrue(validation.isSuccess()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size()); - Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec()); - } - - @Test - public void testValidPolicyWithTimeWindow() { - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("test_policy"); - policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1")); - policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1")); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType("siddhi"); - definition.setValue("from INPUT_STREAM_1#window.time(2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;"); - definition.setInputStreams(policyDefinition.getInputStreams()); - definition.setOutputStreams(policyDefinition.getOutputStreams()); - policyDefinition.setDefinition(definition); - - PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() { - { - put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1")); - } - }); - Assert.assertTrue(validation.isSuccess()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size()); - Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec()); - } - - @Test - public void testValidPolicyWithTooManyInputStreams() { - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("test_policy"); - policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2")); - policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1")); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType("siddhi"); - definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"); - definition.setInputStreams(policyDefinition.getInputStreams()); - definition.setOutputStreams(policyDefinition.getOutputStreams()); - policyDefinition.setDefinition(definition); - - PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() { - { - put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1")); - put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2")); - } - }); - Assert.assertTrue(validation.isSuccess()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size()); - } - - @Test - public void testValidPolicyWithTooFewOutputStreams() { - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("test_policy"); - policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2")); - policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1")); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType("siddhi"); - definition.setValue( - "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;" - + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;" - ); - definition.setInputStreams(policyDefinition.getInputStreams()); - definition.setOutputStreams(policyDefinition.getOutputStreams()); - policyDefinition.setDefinition(definition); - - PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() { - { - put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1")); - put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2")); - } - }); - Assert.assertTrue(validation.isSuccess()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); - Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size()); - } - - @Test - public void testInvalidPolicyForSyntaxError() { - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("test_policy"); - policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM")); - policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM")); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType("siddhi"); - definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;"); - definition.setInputStreams(policyDefinition.getInputStreams()); - definition.setOutputStreams(policyDefinition.getOutputStreams()); - policyDefinition.setDefinition(definition); - - PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() { - { - put("INPUT_STREAM", mockStreamDefinition("INPUT_STREAM")); - } - }); - Assert.assertFalse(validation.isSuccess()); - } - - @Test - public void testInvalidPolicyForNotDefinedInputStream() { - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("test_policy"); - policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1")); - policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1")); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType("siddhi"); - definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"); - definition.setInputStreams(policyDefinition.getInputStreams()); - definition.setOutputStreams(policyDefinition.getOutputStreams()); - policyDefinition.setDefinition(definition); - - PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() { - { - put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2")); - } - }); - Assert.assertFalse(validation.isSuccess()); - } - - @Test - public void testInvalidPolicyForNotDefinedOutputStream() { - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("test_policy"); - policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1")); - policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2")); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType("siddhi"); - definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"); - definition.setInputStreams(policyDefinition.getInputStreams()); - definition.setOutputStreams(policyDefinition.getOutputStreams()); - policyDefinition.setDefinition(definition); - - PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() { - { - put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1")); - } - }); - Assert.assertFalse(validation.isSuccess()); - } - - // --------------------- - // Two Stream Test Cases - // --------------------- - - @Test - public void testParseTwoStreamPolicyQueryWithMultiplePartition() throws Exception { - PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( - "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1#window.externalTime(timestamp, 1 min) " - + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;" - + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2#window.externalTime(timestamp, 1 hour) " - + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;" - ); - Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1")); - Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2")); - Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1")); - Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2")); - Assert.assertEquals(2, executionPlan.getStreamPartitions().size()); - Assert.assertEquals(60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); - Assert.assertEquals(60*60*1000, executionPlan.getStreamPartitions().get(1).getSortSpec().getWindowPeriodMillis()); - } - - @Test - public void testParseTwoStreamPolicyQueryWithSinglePartition() throws Exception { - PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( - "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1#window.externalTime(timestamp, 1 min) " - + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;" - + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2 select * insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;" - ); - Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1")); - Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2")); - Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1")); - Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2")); - Assert.assertEquals(2, executionPlan.getStreamPartitions().size()); - Assert.assertEquals(60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); - Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(1).getType()); - } - - - @Test - public void testParseTwoStreamPolicyQueryInnerJoin() throws Exception { - PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( - "from TickEvent[symbol=='EBAY']#window.length(2000) " + - "join NewsEvent#window.externalTime(timestamp, 1000 sec) \n" + - "select * insert into JoinStream" - ); - Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent")); - Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent")); - Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream")); - Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType()); - Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec()); - Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); - Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(1).getType()); - Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec()); - } - - @Test - public void testParseTwoStreamPolicyQueryInnerJoinWithCondition() throws Exception { - PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( - "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" + - "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" + - "on TickEvent.symbol == NewsEvent.company \n" + - "insert into JoinStream " - ); - Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent")); - Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent")); - Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream")); - Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType()); - Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec()); - Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); - Assert.assertEquals(StreamPartition.Type.GROUPBY, executionPlan.getStreamPartitions().get(1).getType()); - Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec()); - } - - @Test - public void testParseTwoStreamPolicyQueryInnerJoinWithConditionHavingAlias() throws Exception { - PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( - "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" + - "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" + - "on t.symbol == n.company \n" + - "insert into JoinStream " - ); - Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent")); - Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent")); - Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream")); - Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType()); - Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec()); - Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); - Assert.assertEquals(StreamPartition.Type.GROUPBY, executionPlan.getStreamPartitions().get(1).getType()); - Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec()); - } - - @Test(expected = DefinitionNotExistException.class) - public void testParseTwoStreamPolicyQueryInnerJoinWithConditionHavingNotFoundAlias() throws Exception { - PolicyInterpreter.parseExecutionPlan( - "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" + - "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" + - "on t.symbol == NOT_EXIST_ALIAS.company \n" + - "insert into JoinStream " - ); - } - - private static final ObjectMapper mapper = new ObjectMapper(); - - @Test - public void testLeftJoin() throws Exception { - PolicyDefinition def = mapper.readValue(PolicyInterpreterTest.class.getResourceAsStream("/interpreter/policy.json"), PolicyDefinition.class); - ArrayNode array = (ArrayNode)mapper.readTree(PolicyInterpreterTest.class.getResourceAsStream("/interpreter/streams.json")); - Map<String, StreamDefinition> allDefinitions = new HashMap<>(); - for(JsonNode node : array) { - StreamDefinition streamDef = mapper.readValue(node.toString(), StreamDefinition.class); - allDefinitions.put(streamDef.getStreamId(), streamDef); - } - PolicyValidationResult result = PolicyInterpreter.validate(def, allDefinitions); - Assert.assertTrue(result.isSuccess()); - } - - @Test - public void testExtendPolicy() throws Exception { - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("test-extend-policy"); - policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1")); - policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1")); - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE); - policyDefinition.setDefinition(definition); - - Map<String, StreamDefinition> allDefinitions = new HashMap<>(); - allDefinitions.put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1")); - PolicyValidationResult result = PolicyInterpreter.validate(policyDefinition, allDefinitions); - Assert.assertTrue(result.isSuccess()); - } - - - // -------------- - // Helper Methods - // -------------- - - private static StreamDefinition mockStreamDefinition(String streamId) { - StreamDefinition streamDefinition = new StreamDefinition(); - streamDefinition.setStreamId(streamId); - List<StreamColumn> columns = new ArrayList<>(); - columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build()); - columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build()); - columns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build()); - streamDefinition.setColumns(columns); - return streamDefinition; - } - - @Test - public void testValidPolicyWithPattern() { - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("test_policy"); - policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1")); - policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT")); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType("siddhi"); - String policy = - "from every a = HADOOP_JMX_METRIC_STREAM_1[component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] " + - "-> b = HADOOP_JMX_METRIC_STREAM_1[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " + - "select b.metric, b.host as host, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site " + - "group by b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;"; - definition.setValue(policy); - definition.setInputStreams(policyDefinition.getInputStreams()); - definition.setOutputStreams(policyDefinition.getOutputStreams()); - policyDefinition.setDefinition(definition); - - PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() { - { - put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1")); - } - }); - Assert.assertTrue(validation.isSuccess()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size()); - Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec()); - Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType()); - Assert.assertArrayEquals(new String[]{"metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray()); - Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId()); - } - - @Test - public void testValidPolicyWithPatternSort() { - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("test_policy"); - policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1")); - policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT")); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType("siddhi"); - String policy = - "from HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"]#window.externalTime(timestamp, 1 min) " + - "select * group by site, host, component, metric insert into temp;\n" + - "\n" + - "from every a = HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"] -> b = HADOOP_JMX_METRIC_STREAM_1[b.component == a.component and b.metric == a.metric and b.host == a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " + - "select b.site, b.host, b.component, b.metric, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, max(b.timestamp) as timestamp " + - "group by b.site, b.host, b.component, b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;"; - definition.setValue(policy); - definition.setInputStreams(policyDefinition.getInputStreams()); - definition.setOutputStreams(policyDefinition.getOutputStreams()); - policyDefinition.setDefinition(definition); - - PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() { - { - put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1")); - } - }); - Assert.assertTrue(validation.isSuccess()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); - Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size()); - Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec()); - Assert.assertEquals(60000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); - Assert.assertEquals(12000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowMargin()); - Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType()); - Assert.assertArrayEquals(new String[]{"site", "host", "component", "metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray()); - Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId()); - } - - @Test - public void testValidPolicyWithSequence() { - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("test_policy"); - policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1")); - policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT")); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType("siddhi"); - String policy = - "from every a = HADOOP_JMX_METRIC_STREAM_1[component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] " + - ", b = HADOOP_JMX_METRIC_STREAM_1[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " + - "select b.metric, b.host as host, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site " + - "group by b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;"; - definition.setValue(policy); - definition.setInputStreams(policyDefinition.getInputStreams()); - definition.setOutputStreams(policyDefinition.getOutputStreams()); - policyDefinition.setDefinition(definition); - - PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() { - { - put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1")); - } - }); - Assert.assertTrue(validation.isSuccess()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size()); - Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec()); - Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType()); - Assert.assertArrayEquals(new String[]{"metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray()); - Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId()); - } - - @Test - public void testValidPolicyWithSequenceSort() { - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("test_policy"); - policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1")); - policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT")); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType("siddhi"); - String policy = - "from HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"]#window.externalTime(timestamp, 1 min) " + - "select * group by site, host, component, metric insert into temp;\n" + - "\n" + - "from every a = HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"], b = HADOOP_JMX_METRIC_STREAM_1[b.component == a.component and b.metric == a.metric and b.host == a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " + - "select b.site, b.host, b.component, b.metric, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, max(b.timestamp) as timestamp " + - "group by b.site, b.host, b.component, b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;"; - definition.setValue(policy); - definition.setInputStreams(policyDefinition.getInputStreams()); - definition.setOutputStreams(policyDefinition.getOutputStreams()); - policyDefinition.setDefinition(definition); - - PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() { - { - put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1")); - } - }); - Assert.assertTrue(validation.isSuccess()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); - Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size()); - Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size()); - Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec()); - Assert.assertEquals(60000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); - Assert.assertEquals(12000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowMargin()); - Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType()); - Assert.assertArrayEquals(new String[]{"site", "host", "component", "metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray()); - Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java deleted file mode 100644 index 2305a0f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java +++ /dev/null @@ -1,46 +0,0 @@ -package org.apache.eagle.alert.engine.metric; - -import com.codahale.metrics.ConsoleReporter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.jvm.MemoryUsageGaugeSet; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public class MemoryUsageGaugeSetTest { - private final Logger LOG = LoggerFactory.getLogger(MemoryUsageGaugeSetTest.class); - - @Test - public void testJVMMetrics() throws InterruptedException { - LOG.info("Starting testJVMMetrics"); - final MetricRegistry metrics = new MetricRegistry(); - ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(); - metrics.registerAll(new MemoryUsageGaugeSet()); - metrics.register("sample", (Gauge<Double>) () -> 0.1234); - reporter.start(1, TimeUnit.SECONDS); - reporter.close(); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java deleted file mode 100644 index 09f01e4..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.apache.eagle.alert.engine.mock; - -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.LinkedList; -import java.util.List; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public class MockPartitionedCollector implements PartitionedEventCollector { - @SuppressWarnings("unused") - private final static Logger LOG = LoggerFactory.getLogger(MockPartitionedCollector.class); - private List<PartitionedEvent> cache; - - public MockPartitionedCollector() { - cache = new LinkedList<>(); - } - - public void emit(PartitionedEvent event) { - cache.add(event); - } - - public void clear() { - cache.clear(); - } - - public List<PartitionedEvent> get() { - return cache; - } - - public int size() { - return cache.size(); - } - - @Override - public void drop(PartitionedEvent event) { - - } -} \ No newline at end of file
