[EAGLE-749] Add unit test for model. - Add unit test for model which under alert-common moudle. - Fix equals() hashcode() inconsistent for PartitionedEvent,StreamEvent,Publishment. - Fix bug for StreamColumn BooleanType.
https://issues.apache.org/jira/browse/EAGLE-749 Author: r7raul1984 <tangji...@yhd.com> Closes #632 from r7raul1984/EAGLE-749. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2b61cef5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2b61cef5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2b61cef5 Branch: refs/heads/master Commit: 2b61cef585b02fb6f40d5c3de7f7c5c060926ecd Parents: 1da8dc4 Author: r7raul1984 <tangji...@yhd.com> Authored: Wed Nov 9 21:55:51 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Wed Nov 9 21:55:51 2016 +0800 ---------------------------------------------------------------------- .../alert/engine/coordinator/Publishment.java | 21 +- .../alert/engine/coordinator/StreamColumn.java | 2 +- .../engine/coordinator/StreamSortSpec.java | 17 +- .../alert/engine/model/PartitionedEvent.java | 16 +- .../eagle/alert/engine/model/StreamEvent.java | 11 +- .../eagle/alert/config/TestConfigBus.java | 5 +- .../model/Kafka2TupleMetadataTest.java | 49 +++++ .../model/PolicyWorkerQueueTest.java | 63 ++++++ .../model/StreamRepartitionStrategyTest.java | 74 +++++++ .../model/StreamRouterSpecTest.java | 59 ++++++ .../model/Tuple2StreamMetadataTest.java | 50 +++++ .../alert/coordination/model/WorkSlotTest.java | 43 +++++ .../model/internal/MonitoredStreamTest.java | 69 +++++++ .../model/internal/PolicyAssignmentTest.java | 37 ++++ .../model/internal/StreamGroupTest.java | 67 +++++++ .../model/internal/StreamWorkSlotQueueTest.java | 61 ++++++ .../model/internal/TopologyTest.java | 47 +++++ .../OverrideDeduplicatorSpecTest.java | 61 ++++++ .../coordinator/PolicyDefinitionTest.java | 140 ++++++++++++++ .../engine/coordinator/PublishmentTest.java | 100 ++++++++++ .../engine/coordinator/PublishmentTypeTest.java | 47 +++++ .../engine/coordinator/StreamColumnTest.java | 153 +++++++++++++++ .../coordinator/StreamDefinitionTest.java | 52 +++++ .../engine/coordinator/StreamPartitionTest.java | 43 +++++ .../engine/coordinator/StreamSortSpecTest.java | 45 +++++ .../coordinator/StreamingClusterTest.java | 47 +++++ .../engine/model/AlertPublishEventTest.java | 110 +++++++++++ .../engine/model/AlertStreamEventTest.java | 63 ++++++ .../engine/model/PartitionedEventTest.java | 54 ++++++ .../engine/model/StreamEventBuilderTest.java | 166 ++++++++++++++++ .../alert/engine/model/StreamEventTest.java | 191 +++++++++++++++++++ .../eagle/alert/model/StreamEventTest.java | 68 ------- .../eagle/alert/model/TestPolicyDefinition.java | 45 ----- 33 files changed, 1930 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java index dbb1844..d1cc33a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java @@ -135,28 +135,29 @@ public class Publishment { if (obj instanceof Publishment) { Publishment p = (Publishment) obj; return (Objects.equals(name, p.getName()) && Objects.equals(type, p.getType()) - && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin()) - && Objects.equals(dedupFields, p.getDedupFields()) - && Objects.equals(dedupStateField, p.getDedupStateField()) - && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator()) - && Objects.equals(policyIds, p.getPolicyIds()) - && Objects.equals(streamIds, p.getStreamIds()) - && properties.equals(p.getProperties())); + && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin()) + && Objects.equals(dedupFields, p.getDedupFields()) + && Objects.equals(dedupStateField, p.getDedupStateField()) + && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator()) + && Objects.equals(policyIds, p.getPolicyIds()) + && Objects.equals(streamIds, p.getStreamIds()) + && properties.equals(p.getProperties())); } return false; } @Override public int hashCode() { - return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(policyIds) - .append(properties).build(); + return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(dedupFields) + .append(dedupStateField).append(overrideDeduplicator).append(policyIds).append(streamIds) + .append(properties).build(); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:") - .append(policyIds).append(",properties:").append(properties); + .append(policyIds).append(",properties:").append(properties); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java index 5a5f2cc..4628043 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java @@ -85,7 +85,7 @@ public class StreamColumn implements Serializable { this.setDefaultValue(Double.valueOf((String) this.getDefaultValue())); break; case BOOL: - this.setDefaultValue(Double.valueOf((String) this.getDefaultValue())); + this.setDefaultValue(Boolean.valueOf((String) this.getDefaultValue())); break; case OBJECT: try { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java index 65b9151..ff05fc8 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java @@ -16,6 +16,7 @@ */ package org.apache.eagle.alert.engine.coordinator; +import org.apache.commons.lang.StringUtils; import org.apache.eagle.alert.utils.TimePeriodUtils; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.commons.lang.builder.HashCodeBuilder; @@ -45,7 +46,7 @@ public class StreamSortSpec implements Serializable { } public int getWindowPeriodMillis() { - if (windowPeriod != null) { + if (StringUtils.isNotBlank(windowPeriod)) { return TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod)); } else { return 0; @@ -76,9 +77,9 @@ public class StreamSortSpec implements Serializable { @Override public int hashCode() { return new HashCodeBuilder() - .append(windowPeriod) - .append(windowMargin) - .toHashCode(); + .append(windowPeriod) + .append(windowMargin) + .toHashCode(); } @Override @@ -92,14 +93,14 @@ public class StreamSortSpec implements Serializable { StreamSortSpec another = (StreamSortSpec) that; return - another.windowPeriod.equals(this.windowPeriod) - && another.windowMargin == this.windowMargin; + another.windowPeriod.equals(this.windowPeriod) + && another.windowMargin == this.windowMargin; } @Override public String toString() { return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]", - this.getWindowPeriod(), - this.getWindowMargin()); + this.getWindowPeriod(), + this.getWindowMargin()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java index 51e4532..ecca0ff 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java @@ -19,6 +19,7 @@ package org.apache.eagle.alert.engine.model; import org.apache.eagle.alert.engine.coordinator.StreamPartition; import backtype.storm.tuple.Tuple; import org.apache.commons.lang3.builder.HashCodeBuilder; + import java.io.Serializable; import java.util.Objects; @@ -63,9 +64,9 @@ public class PartitionedEvent implements Serializable { if (obj instanceof PartitionedEvent) { PartitionedEvent another = (PartitionedEvent) obj; return !(this.partitionKey != another.getPartitionKey() - || !Objects.equals(this.event, another.getEvent()) - || !Objects.equals(this.partition, another.getPartition()) - || !Objects.equals(this.anchor, another.anchor)); + || !Objects.equals(this.event, another.getEvent()) + || !Objects.equals(this.partition, another.getPartition()) + || !Objects.equals(this.anchor, another.anchor)); } else { return false; } @@ -74,10 +75,11 @@ public class PartitionedEvent implements Serializable { @Override public int hashCode() { return new HashCodeBuilder() - .append(partitionKey) - .append(event) - .append(partition) - .build(); + .append(partitionKey) + .append(event) + .append(partition) + .append(anchor) + .build(); } public StreamEvent getEvent() { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java index 8480bc5..130985f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java @@ -95,7 +95,8 @@ public class StreamEvent implements Serializable { } if (obj instanceof StreamEvent) { StreamEvent another = (StreamEvent) obj; - return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data, another.data); + return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp + && Arrays.deepEquals(this.data, another.data) && Objects.equals(this.metaVersion, another.metaVersion); } return false; } @@ -113,10 +114,10 @@ public class StreamEvent implements Serializable { } } return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]", - this.getStreamId(), - DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), - StringUtils.join(dataStrings, ","), - this.getMetaVersion()); + this.getStreamId(), + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), + StringUtils.join(dataStrings, ","), + this.getMetaVersion()); } public static StreamEventBuilder builder() { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java index 5c3f35e..e37e9be 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java @@ -16,6 +16,7 @@ */ package org.apache.eagle.alert.config; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -69,8 +70,8 @@ public class TestConfigBus { } @After - public void shutdown() { - CloseableUtils.closeQuietly(server); + public void shutdown() throws IOException { + server.stop(); producer.close(); consumer.close(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java new file mode 100644 index 0000000..a252fae --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.coordination.model; + +import org.junit.Assert; +import org.junit.Test; + +public class Kafka2TupleMetadataTest { + @Test + public void testKafka2TupleMetadata() { + Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata(); + kafka2TupleMetadata.setName("setName"); + kafka2TupleMetadata.setCodec(new Tuple2StreamMetadata()); + kafka2TupleMetadata.setType("setType"); + kafka2TupleMetadata.setTopic("setTopic"); + kafka2TupleMetadata.setSchemeCls("org.apache.eagle.alert.engine.scheme.PlainStringScheme"); + + Kafka2TupleMetadata kafka2TupleMetadata1 = new Kafka2TupleMetadata(); + kafka2TupleMetadata1.setName("setName"); + kafka2TupleMetadata1.setCodec(new Tuple2StreamMetadata()); + kafka2TupleMetadata1.setType("setType"); + kafka2TupleMetadata1.setTopic("setTopic"); + kafka2TupleMetadata1.setSchemeCls("org.apache.eagle.alert.engine.scheme.PlainStringScheme"); + + Assert.assertFalse(kafka2TupleMetadata1 == kafka2TupleMetadata); + Assert.assertTrue(kafka2TupleMetadata1.equals(kafka2TupleMetadata)); + Assert.assertTrue(kafka2TupleMetadata1.hashCode() == kafka2TupleMetadata.hashCode()); + + kafka2TupleMetadata1.setType("setType1"); + + Assert.assertFalse(kafka2TupleMetadata1.equals(kafka2TupleMetadata)); + Assert.assertFalse(kafka2TupleMetadata1.hashCode() == kafka2TupleMetadata.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java new file mode 100644 index 0000000..71f3188 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.coordination.model; + +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class PolicyWorkerQueueTest { + @Test + public void testPolicyWorkerQueue() { + + List<WorkSlot> workers = new ArrayList<>(); + WorkSlot workSlot1 = new WorkSlot("setTopologyName1", "setBoltId1"); + WorkSlot workSlot2 = new WorkSlot("setTopologyName1", "setBoltId2"); + workers.add(workSlot1); + workers.add(workSlot2); + PolicyWorkerQueue policyWorkerQueue = new PolicyWorkerQueue(workers); + Assert.assertEquals(null, policyWorkerQueue.getPartition()); + Assert.assertEquals(workSlot1, policyWorkerQueue.getWorkers().get(0)); + Assert.assertEquals(workSlot2, policyWorkerQueue.getWorkers().get(1)); + Assert.assertEquals("[(setTopologyName1:setBoltId1),(setTopologyName1:setBoltId2)]", policyWorkerQueue.toString()); + + PolicyWorkerQueue policyWorkerQueue1 = new PolicyWorkerQueue(); + policyWorkerQueue1.setWorkers(workers); + + Assert.assertTrue(policyWorkerQueue.equals(policyWorkerQueue1)); + Assert.assertTrue(policyWorkerQueue.hashCode() == policyWorkerQueue1.hashCode()); + + StreamSortSpec streamSortSpec = new StreamSortSpec(); + streamSortSpec.setWindowPeriod("PT10S"); + StreamPartition streamPartition = new StreamPartition(); + List<String> columns = new ArrayList<>(); + columns.add("jobId"); + streamPartition.setColumns(columns); + streamPartition.setSortSpec(streamSortSpec); + streamPartition.setStreamId("test"); + streamPartition.setType(StreamPartition.Type.GROUPBY); + policyWorkerQueue1.setPartition(streamPartition); + + Assert.assertFalse(policyWorkerQueue.equals(policyWorkerQueue1)); + Assert.assertFalse(policyWorkerQueue.hashCode() == policyWorkerQueue1.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java new file mode 100644 index 0000000..c416a49 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.coordination.model; + +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.ArrayList; +import java.util.List; + +public class StreamRepartitionStrategyTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testStreamRepartitionStrategy() { + thrown.expect(NullPointerException.class); + StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy(); + streamRepartitionStrategy.hashCode(); + } + + @Test + public void testStreamRepartitionStrategy1() { + thrown.expect(NullPointerException.class); + StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy(); + streamRepartitionStrategy.equals(streamRepartitionStrategy); + } + + @Test + public void testStreamRepartitionStrategy2() { + + StreamSortSpec streamSortSpec = new StreamSortSpec(); + streamSortSpec.setWindowPeriod("PT10S"); + StreamPartition streamPartition = new StreamPartition(); + List<String> columns = new ArrayList<>(); + columns.add("jobId"); + streamPartition.setColumns(columns); + streamPartition.setSortSpec(streamSortSpec); + streamPartition.setStreamId("test"); + streamPartition.setType(StreamPartition.Type.GROUPBY); + + + StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy(); + Assert.assertEquals(null, streamRepartitionStrategy.getPartition()); + Assert.assertEquals(0, streamRepartitionStrategy.getNumTotalParticipatingRouterBolts()); + Assert.assertEquals(0, streamRepartitionStrategy.getStartSequence()); + streamRepartitionStrategy.setPartition(streamPartition); + StreamRepartitionStrategy streamRepartitionStrategy1 = new StreamRepartitionStrategy(); + streamRepartitionStrategy1.setPartition(streamPartition); + + Assert.assertTrue(streamRepartitionStrategy.equals(streamRepartitionStrategy1)); + Assert.assertTrue(streamRepartitionStrategy.hashCode() == streamRepartitionStrategy1.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java new file mode 100644 index 0000000..88e72cb --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.coordination.model; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class StreamRouterSpecTest { + @Test + public void testStreamRouterSpec() { + StreamRouterSpec streamRouterSpec = new StreamRouterSpec(); + Assert.assertEquals(null, streamRouterSpec.getPartition()); + Assert.assertEquals(null, streamRouterSpec.getStreamId()); + Assert.assertTrue(streamRouterSpec.getTargetQueue().isEmpty()); + + List<WorkSlot> workers = new ArrayList<>(); + WorkSlot workSlot1 = new WorkSlot("setTopologyName1", "setBoltId1"); + WorkSlot workSlot2 = new WorkSlot("setTopologyName1", "setBoltId2"); + workers.add(workSlot1); + workers.add(workSlot2); + PolicyWorkerQueue policyWorkerQueue = new PolicyWorkerQueue(workers); + streamRouterSpec.addQueue(policyWorkerQueue); + streamRouterSpec.setStreamId("streamRouterSpec"); + + Assert.assertEquals("streamRouterSpec", streamRouterSpec.getStreamId()); + Assert.assertEquals(1, streamRouterSpec.getTargetQueue().size()); + Assert.assertEquals(2, streamRouterSpec.getTargetQueue().get(0).getWorkers().size()); + + StreamRouterSpec streamRouterSpec1 = new StreamRouterSpec(); + streamRouterSpec1.addQueue(policyWorkerQueue); + streamRouterSpec1.setStreamId("streamRouterSpec1"); + + Assert.assertFalse(streamRouterSpec.equals(streamRouterSpec1)); + + streamRouterSpec1.setStreamId("streamRouterSpec"); + + Assert.assertTrue(streamRouterSpec.equals(streamRouterSpec1)); + Assert.assertTrue(streamRouterSpec.hashCode() == streamRouterSpec1.hashCode()); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java new file mode 100644 index 0000000..8bbfc41 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.coordination.model; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.*; + +public class Tuple2StreamMetadataTest { + @Test + public void testTuple2StreamMetadata() { + Tuple2StreamMetadata metadata = new Tuple2StreamMetadata(); + Set activeStreamNames = new HashSet<>(); + activeStreamNames.add("defaultStringStream"); + metadata.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector"); + metadata.setStreamNameSelectorProp(new Properties()); + metadata.getStreamNameSelectorProp().put("userProvidedStreamName", "defaultStringStream"); + metadata.setActiveStreamNames(activeStreamNames); + metadata.setTimestampColumn("timestamp"); + + Tuple2StreamMetadata metadata1 = new Tuple2StreamMetadata(); + Set activeStreamNames1 = new HashSet<>(); + activeStreamNames1.add("defaultStringStream"); + metadata1.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector"); + metadata1.setStreamNameSelectorProp(new Properties()); + metadata1.getStreamNameSelectorProp().put("userProvidedStreamName", "defaultStringStream"); + metadata1.setActiveStreamNames(activeStreamNames1); + metadata1.setTimestampColumn("timestamp"); + + Assert.assertFalse(metadata == metadata1); + Assert.assertFalse(metadata.equals(metadata1)); + Assert.assertFalse(metadata.hashCode() == metadata1.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java new file mode 100644 index 0000000..48ee73b --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.coordination.model; + +import org.junit.Assert; +import org.junit.Test; + +public class WorkSlotTest { + @Test + public void testWorkSlot() { + WorkSlot workSlot = new WorkSlot(); + Assert.assertEquals("(null:null)", workSlot.toString()); + Assert.assertEquals(null, workSlot.getBoltId()); + Assert.assertEquals(null, workSlot.getTopologyName()); + workSlot.setBoltId("setBoltId"); + workSlot.setTopologyName("setTopologyName"); + Assert.assertEquals("(setTopologyName:setBoltId)", workSlot.toString()); + Assert.assertEquals("setBoltId", workSlot.getBoltId()); + Assert.assertEquals("setTopologyName", workSlot.getTopologyName()); + + WorkSlot workSlot1 = new WorkSlot("setTopologyName", "setBoltId"); + Assert.assertEquals("(setTopologyName:setBoltId)", workSlot1.toString()); + Assert.assertEquals("setBoltId", workSlot1.getBoltId()); + Assert.assertEquals("setTopologyName", workSlot1.getTopologyName()); + Assert.assertTrue(workSlot1.equals(workSlot)); + Assert.assertTrue(workSlot1.hashCode() == workSlot.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java new file mode 100644 index 0000000..a2c0d6e --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.coordination.model.internal; + +import org.apache.eagle.alert.coordination.model.WorkSlot; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class MonitoredStreamTest { + + @Test + public void testMonitoredStream() { + + StreamGroup streamGroup = new StreamGroup(); + StreamSortSpec streamSortSpec = new StreamSortSpec(); + streamSortSpec.setWindowPeriod("PT10S"); + StreamPartition streamPartition = new StreamPartition(); + List<String> columns = new ArrayList<>(); + columns.add("jobId"); + streamPartition.setColumns(columns); + streamPartition.setSortSpec(streamSortSpec); + streamPartition.setStreamId("test"); + streamPartition.setType(StreamPartition.Type.GROUPBY); + streamGroup.addStreamPartition(streamPartition); + WorkSlot workSlot = new WorkSlot("setTopologyName", "setBoltId"); + List<WorkSlot> workSlots = new ArrayList<>(); + workSlots.add(workSlot); + StreamWorkSlotQueue streamWorkSlotQueue = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots); + + MonitoredStream monitoredStream = new MonitoredStream(streamGroup); + Assert.assertEquals(null, monitoredStream.getVersion()); + Assert.assertTrue(monitoredStream.getQueues().isEmpty()); + Assert.assertEquals(streamGroup, monitoredStream.getStreamGroup()); + monitoredStream.addQueues(streamWorkSlotQueue); + Assert.assertEquals(streamWorkSlotQueue, monitoredStream.getQueues().get(0)); + + MonitoredStream monitoredStream1 = new MonitoredStream(streamGroup); + Assert.assertTrue(monitoredStream.equals(monitoredStream1)); + Assert.assertTrue(monitoredStream.hashCode() == monitoredStream1.hashCode()); + + monitoredStream.removeQueue(streamWorkSlotQueue); + Assert.assertTrue(monitoredStream.getQueues().isEmpty()); + + Assert.assertTrue(monitoredStream.equals(monitoredStream1)); + Assert.assertTrue(monitoredStream.hashCode() == monitoredStream1.hashCode()); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java new file mode 100644 index 0000000..1491c77 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.coordination.model.internal; + +import org.junit.Assert; +import org.junit.Test; + +public class PolicyAssignmentTest { + @Test + public void testPolicyAssignment() { + PolicyAssignment policyAssignment = new PolicyAssignment("policy", "queue"); + Assert.assertEquals("policy", policyAssignment.getPolicyName()); + Assert.assertEquals("queue", policyAssignment.getQueueId()); + Assert.assertEquals(null, policyAssignment.getVersion()); + Assert.assertEquals("PolicyAssignment of policy policy, queueId queue, version null !", policyAssignment.toString()); + + Assert.assertFalse(policyAssignment.equals(new PolicyAssignment("policy", "queue"))); + Assert.assertFalse(policyAssignment == new PolicyAssignment("policy", "queue")); + Assert.assertFalse(policyAssignment.hashCode() == new PolicyAssignment("policy", "queue").hashCode()); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java new file mode 100644 index 0000000..d0f0189 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordination.model.internal; + +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class StreamGroupTest { + @Test + public void testStreamGroup() { + StreamGroup streamGroup = new StreamGroup(); + Assert.assertEquals("StreamGroup partitions=: [] ", streamGroup.toString()); + Assert.assertEquals("SG[]", streamGroup.getStreamId()); + + StreamSortSpec streamSortSpec = new StreamSortSpec(); + streamSortSpec.setWindowPeriod("PT10S"); + StreamPartition streamPartition = new StreamPartition(); + List<String> columns = new ArrayList<>(); + columns.add("jobId"); + streamPartition.setColumns(columns); + streamPartition.setSortSpec(streamSortSpec); + streamPartition.setStreamId("test"); + streamPartition.setType(StreamPartition.Type.GROUPBY); + streamGroup.addStreamPartition(streamPartition); + Assert.assertEquals("SG[test-]", streamGroup.getStreamId()); + Assert.assertEquals("StreamGroup partitions=: [StreamPartition[streamId=test,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]]] ", streamGroup.toString()); + + List<StreamPartition> streamPartitions = new ArrayList<>(); + streamPartition.setStreamId("test1"); + streamPartitions.add(streamPartition); + streamGroup.addStreamPartitions(streamPartitions); + Assert.assertEquals("SG[test1-test1-]", streamGroup.getStreamId()); + + + streamPartitions = new ArrayList<>(); + StreamPartition streamPartition1 = new StreamPartition(); + streamPartition1.setStreamId("test2"); + streamPartitions.add(streamPartition1); + streamGroup.addStreamPartitions(streamPartitions); + Assert.assertEquals("SG[test1-test1-test2-]", streamGroup.getStreamId()); + Assert.assertEquals("StreamGroup partitions=: [StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test2,type=null,columns=[],sortSpec=[null]]] ", streamGroup.toString()); + + StreamGroup streamGroup1 = new StreamGroup(); + streamGroup1.addStreamPartitions(streamGroup.getStreamPartitions()); + Assert.assertTrue(streamGroup.equals(streamGroup1)); + Assert.assertTrue(streamGroup.hashCode() == streamGroup1.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java new file mode 100644 index 0000000..bc2f74e --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordination.model.internal; + +import org.apache.eagle.alert.coordination.model.WorkSlot; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class StreamWorkSlotQueueTest { + @Test + public void testStreamWorkSlotQueue() { + StreamGroup streamGroup = new StreamGroup(); + StreamSortSpec streamSortSpec = new StreamSortSpec(); + streamSortSpec.setWindowPeriod("PT10S"); + StreamPartition streamPartition = new StreamPartition(); + List<String> columns = new ArrayList<>(); + columns.add("jobId"); + streamPartition.setColumns(columns); + streamPartition.setSortSpec(streamSortSpec); + streamPartition.setStreamId("test"); + streamPartition.setType(StreamPartition.Type.GROUPBY); + streamGroup.addStreamPartition(streamPartition); + WorkSlot workSlot = new WorkSlot("setTopologyName", "setBoltId"); + List<WorkSlot> workSlots = new ArrayList<>(); + workSlots.add(workSlot); + StreamWorkSlotQueue streamWorkSlotQueue = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots); + + Assert.assertTrue(streamWorkSlotQueue.getQueueId().startsWith("SG[test-]")); + Assert.assertTrue(streamWorkSlotQueue.getDedicateOption().isEmpty()); + Assert.assertEquals(0, streamWorkSlotQueue.getNumberOfGroupBolts()); + Assert.assertEquals(1, streamWorkSlotQueue.getQueueSize()); + Assert.assertTrue(streamWorkSlotQueue.getTopoGroupStartIndex().isEmpty()); + Assert.assertEquals(-1, streamWorkSlotQueue.getTopologyGroupStartIndex("")); + Assert.assertEquals(workSlot, streamWorkSlotQueue.getWorkingSlots().get(0)); + + StreamWorkSlotQueue streamWorkSlotQueue1 = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots); + Assert.assertFalse(streamWorkSlotQueue.equals(streamWorkSlotQueue1)); + Assert.assertFalse(streamWorkSlotQueue == streamWorkSlotQueue1); + Assert.assertFalse(streamWorkSlotQueue.hashCode() == streamWorkSlotQueue1.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java new file mode 100644 index 0000000..760657a --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.coordination.model.internal; + +import org.junit.Assert; +import org.junit.Test; + +public class TopologyTest { + @Test + public void testTopology() { + Topology topology = new Topology("test", 2, 3); + Assert.assertEquals(null, topology.getClusterName()); + Assert.assertEquals("test", topology.getName()); + Assert.assertEquals(null, topology.getPubBoltId()); + Assert.assertEquals(null, topology.getSpoutId()); + Assert.assertEquals(0, topology.getAlertBoltIds().size()); + Assert.assertEquals(1, topology.getAlertParallelism()); + Assert.assertEquals(0, topology.getGroupNodeIds().size()); + Assert.assertEquals(1, topology.getGroupParallelism()); + Assert.assertEquals(3, topology.getNumOfAlertBolt()); + Assert.assertEquals(2, topology.getNumOfGroupBolt()); + Assert.assertEquals(0, topology.getNumOfPublishBolt()); + Assert.assertEquals(1, topology.getNumOfSpout()); + Assert.assertEquals(1, topology.getSpoutParallelism()); + + Topology topology1 = new Topology("test", 2, 3); + + Assert.assertFalse(topology1.equals(topology)); + Assert.assertFalse(topology1.hashCode() == topology.hashCode()); + Assert.assertFalse(topology1 == topology); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java new file mode 100644 index 0000000..cc84c56 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.coordinator; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class OverrideDeduplicatorSpecTest { + + @Test + public void testOverrideDeduplicatorSpec() { + Map<String, String> properties = new HashMap<>(); + properties.put("kafka_broker", "localhost:9092"); + properties.put("topic", "TEST_TOPIC_NAME"); + OverrideDeduplicatorSpec overrideDeduplicatorSpec = new OverrideDeduplicatorSpec(); + overrideDeduplicatorSpec.setClassName("testClass"); + overrideDeduplicatorSpec.setProperties(properties); + + OverrideDeduplicatorSpec overrideDeduplicatorSpec1 = new OverrideDeduplicatorSpec(); + overrideDeduplicatorSpec1.setClassName("testClass"); + overrideDeduplicatorSpec1.setProperties(properties); + + Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec); + Assert.assertTrue(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec)); + Assert.assertTrue(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode()); + + overrideDeduplicatorSpec1.setClassName("testClass1"); + + Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec); + Assert.assertFalse(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec)); + Assert.assertFalse(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode()); + + overrideDeduplicatorSpec1.setClassName("testClass"); + Map<String, String> properties1 = new HashMap<>(); + properties.put("kafka_broker", "localhost:9092"); + overrideDeduplicatorSpec1.setProperties(properties1); + + Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec); + Assert.assertFalse(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec)); + Assert.assertFalse(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java new file mode 100644 index 0000000..7acb4f7 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.coordinator; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; + +public class PolicyDefinitionTest { + + @Test + public void testPolicyInnerDefinition() { + PolicyDefinition.Definition def = new PolicyDefinition.Definition(); + def.setValue("test"); + def.setType("siddhi"); + def.setHandlerClass("setHandlerClass"); + def.setProperties(new HashMap<>()); + def.setOutputStreams(Arrays.asList("outputStream")); + def.setInputStreams(Arrays.asList("inputStream")); + Assert.assertEquals("{type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }", def.toString()); + + PolicyDefinition.Definition def1 = new PolicyDefinition.Definition(); + def1.setValue("test"); + def1.setType("siddhi"); + def1.setHandlerClass("setHandlerClass"); + def1.setProperties(new HashMap<>()); + def1.setOutputStreams(Arrays.asList("outputStream")); + def1.setInputStreams(Arrays.asList("inputStream")); + + Assert.assertFalse(def == def1); + Assert.assertTrue(def.equals(def1)); + Assert.assertTrue(def.hashCode() == def1.hashCode()); + + def1.setInputStreams(Arrays.asList("inputStream1")); + + Assert.assertFalse(def.equals(def1)); + Assert.assertTrue(def.hashCode() == def1.hashCode());//problem equals() and hashCode() be inconsistent + + } + + @Test + public void testPolicyDefinition() { + PolicyDefinition pd = new PolicyDefinition(); + PolicyDefinition.Definition def = new PolicyDefinition.Definition(); + def.setValue("test"); + def.setType("siddhi"); + def.setHandlerClass("setHandlerClass"); + def.setProperties(new HashMap<>()); + def.setOutputStreams(Arrays.asList("outputStream")); + def.setInputStreams(Arrays.asList("inputStream")); + pd.setDefinition(def); + pd.setInputStreams(Arrays.asList("inputStream"));//confuse with PolicyDefinition.Definition InputStreams + pd.setOutputStreams(Arrays.asList("outputStream"));//confuse with PolicyDefinition.Definition OutputStreams + pd.setName("policyName"); + pd.setDescription(String.format("Test policy for stream %s", "streamName")); + + StreamPartition sp = new StreamPartition(); + sp.setStreamId("streamName"); + sp.setColumns(Arrays.asList("host")); + sp.setType(StreamPartition.Type.GROUPBY); + pd.addPartition(sp); + Assert.assertEquals("{name=\"policyName\",definition={type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }}", pd.toString()); + + PolicyDefinition pd1 = new PolicyDefinition(); + PolicyDefinition.Definition def1 = new PolicyDefinition.Definition(); + def1.setValue("test"); + def1.setType("siddhi"); + def1.setHandlerClass("setHandlerClass"); + def1.setProperties(new HashMap<>()); + def1.setOutputStreams(Arrays.asList("outputStream")); + def1.setInputStreams(Arrays.asList("inputStream")); + pd1.setDefinition(def1); + pd1.setInputStreams(Arrays.asList("inputStream"));//confuse with PolicyDefinition.Definition InputStreams + pd1.setOutputStreams(Arrays.asList("outputStream"));//confuse with PolicyDefinition.Definition OutputStreams + pd1.setName("policyName"); + pd1.setDescription(String.format("Test policy for stream %s", "streamName")); + + StreamPartition sp1 = new StreamPartition(); + sp1.setStreamId("streamName"); + sp1.setColumns(Arrays.asList("host")); + sp1.setType(StreamPartition.Type.GROUPBY); + pd1.addPartition(sp1); + + + Assert.assertFalse(pd == pd1); + Assert.assertTrue(pd.equals(pd1)); + Assert.assertTrue(pd.hashCode() == pd1.hashCode()); + sp1.setStreamId("streamName1"); + + Assert.assertFalse(pd == pd1); + Assert.assertFalse(pd.equals(pd1)); + Assert.assertFalse(pd.hashCode() == pd1.hashCode()); + + sp1.setStreamId("streamName"); + def1.setOutputStreams(Arrays.asList("outputStream1")); + + Assert.assertFalse(pd == pd1); + Assert.assertFalse(pd.equals(pd1)); + + Assert.assertTrue(pd.hashCode() == pd1.hashCode());//problem equals() and hashCode() be inconsistent + + } + + @Test + public void testPolicyDefinitionEqualByPolicyStatus() { + PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); + PolicyDefinition policy1 = new PolicyDefinition(); + policy1.setName("policy1"); + policy1.setDefinition(definition); + + PolicyDefinition policy2 = new PolicyDefinition(); + policy2.setName("policy1"); + policy2.setPolicyStatus(PolicyDefinition.PolicyStatus.DISABLED); + policy2.setDefinition(definition); + + PolicyDefinition policy3 = new PolicyDefinition(); + policy3.setName("policy1"); + policy3.setDefinition(definition); + + Assert.assertTrue(policy1.equals(policy3)); + Assert.assertFalse(policy1.equals(policy2)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java new file mode 100644 index 0000000..494d8ca --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.coordinator; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +import java.util.*; + +public class PublishmentTest { + @Test + public void testPublishment() { + Map<String, Object> properties = new HashMap<>(); + properties.put("kafka_broker", "localhost:9092"); + properties.put("topic", "TEST_TOPIC_NAME"); + + List<Map<String, Object>> kafkaClientConfig = new ArrayList<>(); + kafkaClientConfig.add(ImmutableMap.of("name", "producer.type", "value", "sync")); + properties.put("kafka_client_config", kafkaClientConfig); + + PolicyDefinition policy = createPolicy("testStream", "testPolicy"); + Publishment publishment = new Publishment(); + publishment.setName("testAsyncPublishment"); + publishment.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher"); + publishment.setPolicyIds(Arrays.asList(policy.getName())); + publishment.setDedupIntervalMin("PT0M"); + OverrideDeduplicatorSpec overrideDeduplicatorSpec = new OverrideDeduplicatorSpec(); + overrideDeduplicatorSpec.setClassName("testClass"); + publishment.setOverrideDeduplicator(overrideDeduplicatorSpec); + publishment.setSerializer("org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer"); + publishment.setProperties(properties); + + Assert.assertEquals("Publishment[name:testAsyncPublishment,type:org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher,policyId:[testPolicy],properties:{kafka_client_config=[{name=producer.type, value=sync}], topic=TEST_TOPIC_NAME, kafka_broker=localhost:9092}", publishment.toString()); + + + Publishment publishment1 = new Publishment(); + publishment1.setName("testAsyncPublishment"); + publishment1.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher"); + publishment1.setPolicyIds(Arrays.asList(policy.getName())); + publishment1.setDedupIntervalMin("PT0M"); + OverrideDeduplicatorSpec overrideDeduplicatorSpec1 = new OverrideDeduplicatorSpec(); + overrideDeduplicatorSpec1.setClassName("testClass"); + publishment1.setOverrideDeduplicator(overrideDeduplicatorSpec1); + publishment1.setSerializer("org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer"); + publishment1.setProperties(properties); + + Assert.assertTrue(publishment.equals(publishment1)); + Assert.assertTrue(publishment.hashCode() == publishment1.hashCode()); + Assert.assertFalse(publishment == publishment1); + publishment1.getOverrideDeduplicator().setClassName("testClass1"); + + + Assert.assertFalse(publishment.equals(publishment1)); + Assert.assertFalse(publishment.hashCode() == publishment1.hashCode()); + Assert.assertFalse(publishment == publishment1); + + publishment1.getOverrideDeduplicator().setClassName("testClass"); + publishment1.setStreamIds(Arrays.asList("streamid1,streamid2")); + Assert.assertFalse(publishment.equals(publishment1)); + Assert.assertFalse(publishment.hashCode() == publishment1.hashCode()); + Assert.assertFalse(publishment == publishment1); + } + + private PolicyDefinition createPolicy(String streamName, String policyName) { + PolicyDefinition pd = new PolicyDefinition(); + PolicyDefinition.Definition def = new PolicyDefinition.Definition(); + // expression, something like "PT5S,dynamic,1,host" + def.setValue("test"); + def.setType("siddhi"); + pd.setDefinition(def); + pd.setInputStreams(Arrays.asList("inputStream")); + pd.setOutputStreams(Arrays.asList("outputStream")); + pd.setName(policyName); + pd.setDescription(String.format("Test policy for stream %s", streamName)); + + StreamPartition sp = new StreamPartition(); + sp.setStreamId(streamName); + sp.setColumns(Arrays.asList("host")); + sp.setType(StreamPartition.Type.GROUPBY); + pd.addPartition(sp); + return pd; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java new file mode 100644 index 0000000..91f9cf8 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.coordinator; + +import org.junit.Assert; +import org.junit.Test; + +public class PublishmentTypeTest { + @Test + public void testPublishmentType() { + PublishmentType publishmentType = new PublishmentType(); + publishmentType.setType("KAFKA"); + publishmentType.setClassName("setClassName"); + publishmentType.setDescription("setDescription"); + publishmentType.setFields("setFields"); + + PublishmentType publishmentType1 = new PublishmentType(); + publishmentType1.setType("KAFKA"); + publishmentType1.setClassName("setClassName"); + publishmentType1.setDescription("setDescription"); + publishmentType1.setFields("setFields"); + + Assert.assertFalse(publishmentType.equals(new String(""))); + Assert.assertFalse(publishmentType == publishmentType1); + Assert.assertTrue(publishmentType.equals(publishmentType1)); + Assert.assertTrue(publishmentType.hashCode() == publishmentType1.hashCode()); + + publishmentType1.setType("JMS"); + + Assert.assertFalse(publishmentType.equals(publishmentType1)); + Assert.assertFalse(publishmentType.hashCode() == publishmentType1.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java new file mode 100644 index 0000000..ccc7717 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.coordinator; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.HashMap; + + +public class StreamColumnTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testStreamStringColumn() { + StreamColumn streamColumn = new StreamColumn.Builder().name("NAMEyhd").type(StreamColumn.Type.STRING).defaultValue("EAGLEyhd").required(true).build(); + streamColumn.setNodataExpression("PT1M,dynamic,1,NAMEyhd"); + Assert.assertEquals("StreamColumn=name[NAMEyhd], type=[string], defaultValue=[EAGLEyhd], required=[true], nodataExpression=[PT1M,dynamic,1,NAMEyhd]", streamColumn.toString()); + Assert.assertTrue(streamColumn.getDefaultValue() instanceof String); + } + + @Test + public void testStreamLongColumn() { + thrown.expect(NumberFormatException.class); + new StreamColumn.Builder().name("salary").type(StreamColumn.Type.LONG).defaultValue("eagle").required(true).build(); + } + + @Test + public void testStreamLongColumn1() { + StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.LONG).defaultValue("0").required(true).build(); + streamColumn.setNodataExpression("PT1M,dynamic,1,salary"); + Assert.assertEquals("StreamColumn=name[salary], type=[long], defaultValue=[0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString()); + Assert.assertTrue(streamColumn.getDefaultValue() instanceof Long); + } + + @Test + public void testStreamDoubleColumn() { + thrown.expect(NumberFormatException.class); + new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("eagle").required(true).build(); + } + + @Test + public void testStreamDoubleColumn1() { + StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("0.1").required(true).build(); + streamColumn.setNodataExpression("PT1M,dynamic,1,salary"); + Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString()); + + streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("-0.1").required(true).build(); + streamColumn.setNodataExpression("PT1M,dynamic,1,salary"); + Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[-0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString()); + + streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("1").required(true).build(); + streamColumn.setNodataExpression("PT1M,dynamic,1,salary"); + Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[1.0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString()); + Assert.assertTrue(streamColumn.getDefaultValue() instanceof Double); + } + + @Test + public void testStreamFloatColumn() { + thrown.expect(NumberFormatException.class); + new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("eagle").required(true).build(); + } + + @Test + public void testStreamFloatColumn1() { + StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("0.1").required(true).build(); + streamColumn.setNodataExpression("PT1M,dynamic,1,salary"); + Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString()); + + streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("-0.1").required(true).build(); + streamColumn.setNodataExpression("PT1M,dynamic,1,salary"); + Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[-0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString()); + + streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("1").required(true).build(); + streamColumn.setNodataExpression("PT1M,dynamic,1,salary"); + Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[1.0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString()); + Assert.assertTrue(streamColumn.getDefaultValue() instanceof Float); + } + + @Test + public void testStreamIntColumn() { + thrown.expect(NumberFormatException.class); + new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("eagle").required(true).build(); + } + + @Test + public void testStreamIntColumn1() { + thrown.expect(NumberFormatException.class); + new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("0.1").required(true).build(); + } + + + @Test + public void testStreamIntColumn2() { + StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("1").required(true).build(); + streamColumn.setNodataExpression("PT1M,dynamic,1,salary"); + Assert.assertEquals("StreamColumn=name[salary], type=[int], defaultValue=[1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString()); + + streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("0").required(true).build(); + streamColumn.setNodataExpression("PT1M,dynamic,1,salary"); + Assert.assertEquals("StreamColumn=name[salary], type=[int], defaultValue=[0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString()); + Assert.assertTrue(streamColumn.getDefaultValue() instanceof Integer); + } + + @Test + public void testStreamBoolColumn() { + StreamColumn streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("eagle").required(false).build(); + streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd"); + Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[false], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString()); + streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("1").required(true).build(); + streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd"); + Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString()); + streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("0").required(true).build(); + streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd"); + Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString()); + streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("True").required(true).build(); + streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd"); + Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[true], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString()); + Assert.assertTrue(streamBoolColumn.getDefaultValue() instanceof Boolean); + } + + @Test + public void testStreamObjectColumn() { + thrown.expect(IllegalArgumentException.class); + new StreamColumn.Builder().name("name").type(StreamColumn.Type.OBJECT).defaultValue("eagle").required(true).build(); + } + + @Test + public void testStreamObjectColumn1() { + StreamColumn streamColumn = new StreamColumn.Builder().name("name").type(StreamColumn.Type.OBJECT).defaultValue("{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}").required(true).build(); + streamColumn.setNodataExpression("PT1M,dynamic,1,name"); + Assert.assertEquals("StreamColumn=name[name], type=[object], defaultValue=[{name=heap.COMMITTED, Value=175636480}], required=[true], nodataExpression=[PT1M,dynamic,1,name]", streamColumn.toString()); + Assert.assertTrue(streamColumn.getDefaultValue() instanceof HashMap); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java new file mode 100644 index 0000000..b5015cd --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.coordinator; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class StreamDefinitionTest { + @Test + public void testStreamDefinition() { + + List<StreamColumn> streamColumns = new ArrayList<>(); + streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build()); + streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build()); + streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build()); + streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build()); + streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build()); + + StreamDefinition streamDefinition = new StreamDefinition(); + Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[]", streamDefinition.toString()); + streamDefinition.setColumns(streamColumns); + + Assert.assertEquals(3, streamDefinition.getColumnIndex("data")); + Assert.assertEquals(-1, streamDefinition.getColumnIndex("DATA")); + Assert.assertEquals(-1, streamDefinition.getColumnIndex("isYhd")); + Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition.toString()); + StreamDefinition streamDefinition1 = streamDefinition.copy(); + Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition1.toString()); + + Assert.assertFalse(streamDefinition1.equals(streamDefinition)); + Assert.assertFalse(streamDefinition1 == streamDefinition); + Assert.assertFalse(streamDefinition1.hashCode() == streamDefinition.hashCode()); + } +}