http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java deleted file mode 100644 index e7efbd7..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java +++ /dev/null @@ -1,419 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.alert.coordinator.mock.InMemMetadataServiceClient; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata; -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.StreamGroup; -import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.coordinator.IScheduleContext; -import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition.Definition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.junit.Assert; -import org.junit.Test; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -/** - * @since May 5, 2016 - */ -public class ScheduleContextBuilderTest { - - Config config = ConfigFactory.load().getConfig("coordinator"); - - @Test - public void test() { - InMemMetadataServiceClient client = getSampleMetadataService(); - - ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client); - - IScheduleContext context = builder.buildContext(); - - // assert topology usage - Map<String, TopologyUsage> usages = context.getTopologyUsages(); - Assert.assertEquals(1, usages.get(TOPO1).getMonitoredStream().size()); - Assert.assertTrue(usages.get(TOPO1).getPolicies().contains(TEST_POLICY_1)); - - String alertBolt0 = TOPO1 + "-alert-" + "0"; - String alertBolt1 = TOPO1 + "-alert-" + "1"; - String alertBolt2 = TOPO1 + "-alert-" + "2"; - for (AlertBoltUsage u : usages.get(TOPO1).getAlertUsages().values()) { - if (u.getBoltId().equals(alertBolt0) || u.getBoltId().equals(alertBolt1) - || u.getBoltId().equals(alertBolt2)) { - Assert.assertEquals(1, u.getPolicies().size()); - Assert.assertTrue(u.getPolicies().contains(TEST_POLICY_1)); - Assert.assertEquals(1, u.getPartitions().size()); - Assert.assertEquals(1, u.getReferQueues().size()); - } - } - } - - @Test - public void test_remove_policy() { - InMemMetadataServiceClient client = getSampleMetadataService(); - ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client); - - PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0); - - IScheduleContext context = builder.buildContext(); - Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight(); - - client.removePolicy(0); - context = builder.buildContext(); - Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - - WorkSlot slot = queue.getWorkingSlots().get(0); - Set<String> topoPolicies = context.getTopologyUsages().get(slot.topologyName).getPolicies(); - Assert.assertFalse(topoPolicies.contains(TEST_DATASOURCE_1)); - Assert.assertEquals(0, topoPolicies.size()); - } - - @Test - public void test_changed_policy_partition() { - InMemMetadataServiceClient client = getSampleMetadataService(); - ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client); - PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0); - - IScheduleContext context = builder.buildContext(); - Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - - StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight(); - - PolicyDefinition pd1 = client.listPolicies().get(0); - // add a new group by column : need to replace the partiton spec, to - // avoid reference same object in - // on jvm (no serialization and deserialization) - StreamPartition par = new StreamPartition(pd1.getPartitionSpec().get(0)); - par.getColumns().add("s1"); - pd1.getPartitionSpec().clear(); - pd1.getPartitionSpec().add(par); - - context = builder.buildContext(); - - // assert the policy assignment is removed - Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - // assert the monitored stream is removed as no policy on it now. - Assert.assertEquals(0, context.getMonitoredStreams().size()); - // assert the topology usage doesn't contain policy - WorkSlot slot = queue.getWorkingSlots().get(0); - TopologyUsage topologyUsage = context.getTopologyUsages().get(slot.topologyName); - Set<String> topoPolicies = topologyUsage.getPolicies(); - Assert.assertFalse(topoPolicies.contains(TEST_DATASOURCE_1)); - Assert.assertEquals(0, topoPolicies.size()); - // assert the topology usage doesn't contain the monitored stream - Assert.assertEquals(0, topologyUsage.getMonitoredStream().size()); - // assert the alert bolt usage doesn't have the queue reference - Assert.assertEquals(0, topologyUsage.getAlertBoltUsage(slot.getBoltId()).getReferQueues().size()); - } - - @Test - public void test_changed_policy_parallelism() { - InMemMetadataServiceClient client = getSampleMetadataService(); - ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client); - PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0); - - IScheduleContext context = builder.buildContext(); - Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - - StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight(); - - PolicyDefinition pd1 = client.listPolicies().get(0); - pd1.setParallelismHint(4); // default queue is 5 , change to smaller, same like change bigger - - context = builder.buildContext(); - Assert.assertFalse(context.getPolicyAssignments().values().iterator().hasNext()); - //PolicyAssignment assignmentNew = context.getPolicyAssignments().values().iterator().next(); - //StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, assignmentNew.getQueueId()).getRight(); - //Assert.assertNotNull(queueNew); - // just to make sure queueNew is present - //Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId()); - - // default queue is 5 , change to bigger 6, policy assignment removed - pd1.setParallelismHint(queue.getQueueSize() + 1); - context = builder.buildContext(); - - Assert.assertFalse(context.getPolicyAssignments().values().iterator().hasNext()); - } - - @Test - public void test_changed_policy_definition() { - InMemMetadataServiceClient client = getSampleMetadataService(); - ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client); - PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0); - - IScheduleContext context = builder.buildContext(); - Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - - StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight(); - - PolicyDefinition pd1 = client.listPolicies().get(0); - pd1.getDefinition().value = "define.. new..."; - - context = builder.buildContext(); - PolicyAssignment assignmentNew = context.getPolicyAssignments().values().iterator().next(); - StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, assignmentNew.getQueueId()).getRight(); - Assert.assertNotNull(queueNew); - // just to make sure queueNew is present - Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId()); - } - - @Test - public void test_stream_noalert_policies_generation() throws Exception { - InMemMetadataServiceClient client = getSampleMetadataServiceWithNodataAlert(); - - ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client); - IScheduleContext context = builder.buildContext(); - - PolicyDefinition policyDefinition = null; - PolicyDefinition aggrPolicyDefinition = null; - for (Entry<String, PolicyDefinition> entry : context.getPolicies().entrySet()) { - if (entry.getKey().endsWith("_nodata_alert")) { - policyDefinition = entry.getValue(); - continue; - } - if (entry.getKey().endsWith("_aggregation_stream_policy")) { - aggrPolicyDefinition = entry.getValue(); - continue; - } - } - Assert.assertEquals(3, context.getPolicies().size()); - - Assert.assertNotNull(policyDefinition); - Assert.assertEquals("nodataalert", policyDefinition.getDefinition().getType()); - Assert.assertEquals("PT5S,dynamic,1," + COL1, policyDefinition.getDefinition().getValue()); - - Assert.assertNotNull(aggrPolicyDefinition); - Assert.assertEquals("siddhi", aggrPolicyDefinition.getDefinition().getType()); - - Kafka2TupleMetadata datasource = null; - for (Entry<String, Kafka2TupleMetadata> entry : context.getDataSourceMetadata().entrySet()) { - if ("nodata_alert_aggregation_ds".equals(entry.getKey())) { - datasource = entry.getValue(); - break; - } - } - Assert.assertNotNull(datasource); - - String publishmentName = policyDefinition.getName() + "_publish"; - Publishment publishment = null; - for (Entry<String, Publishment> entry : context.getPublishments().entrySet()) { - if (publishmentName.equals(entry.getKey())) { - publishment = entry.getValue(); - break; - } - } - Assert.assertNotNull(publishment); - } - - @Test - public void test_renamed_topologies() { - InMemMetadataServiceClient client = getSampleMetadataService(); - ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client); - - IScheduleContext context = builder.buildContext(); - Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - - Topology t = client.listTopologies().get(0); - t.setName("newName"); - - context = builder.buildContext(); - Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - } - - private static final String TOPO1 = "topo1"; - private static final String V1 = "v1"; - private static final String COL1 = "col1"; - private static final String OUT_STREAM1 = "out-stream1"; - private static final String TEST_POLICY_1 = "test-policy-1"; - private static final String TEST_STREAM_DEF_1 = "testStreamDef"; - private static final String TEST_DATASOURCE_1 = "test-datasource-1"; - private static StreamPartition par; - private static String queueId; - private static StreamGroup streamGroup; - - public static InMemMetadataServiceClient getSampleMetadataService() { - InMemMetadataServiceClient client = new InMemMetadataServiceClient(); - client.addTopology(createSampleTopology()); - client.addDataSource(createKafka2TupleMetadata()); - client.addPolicy(createPolicy()); - client.addPublishment(createPublishment()); - client.addStreamDefinition(createStreamDefinition()); - client.addScheduleState(createScheduleState()); - return client; - } - - public static InMemMetadataServiceClient getSampleMetadataServiceWithNodataAlert() { - InMemMetadataServiceClient client = new InMemMetadataServiceClient(); - client.addTopology(createSampleTopology()); - client.addDataSource(createKafka2TupleMetadata()); - client.addPolicy(createPolicy()); - client.addPublishment(createPublishment()); - client.addStreamDefinition(createStreamDefinitionWithNodataAlert()); - client.addScheduleState(createScheduleState()); - return client; - } - - private static StreamDefinition createStreamDefinitionWithNodataAlert() { - StreamDefinition def = new StreamDefinition(); - def.setStreamId(TEST_STREAM_DEF_1); - def.setDataSource(TEST_DATASOURCE_1); - - StreamColumn col = new StreamColumn(); - col.setName(COL1); - col.setRequired(true); - col.setType(Type.STRING); - col.setNodataExpression("PT5S,dynamic,1," + COL1); - def.getColumns().add(col); - - return def; - } - - - private static ScheduleState createScheduleState() { - ScheduleState ss = new ScheduleState(); - ss.setVersion(V1); - - ss.getMonitoredStreams().add(createMonitoredStream()); - ss.getAssignments().add(createAssignment()); - - return ss; - } - - private static MonitoredStream createMonitoredStream() { - MonitoredStream ms = new MonitoredStream(streamGroup); - ms.setVersion(V1); - - List<WorkSlot> slots = new ArrayList<WorkSlot>(); - WorkSlot slot0 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 0); - WorkSlot slot1 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 1); - WorkSlot slot2 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 2); - WorkSlot slot3 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 3); - WorkSlot slot4 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 4); - WorkSlot slot5 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 5); - slots.add(slot0); - slots.add(slot1); - slots.add(slot2); - slots.add(slot3); - slots.add(slot4); - //slots.add(slot5); - - StreamWorkSlotQueue q = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), slots); - ms.addQueues(q); - queueId = q.getQueueId(); - return ms; - } - - private static PolicyAssignment createAssignment() { - PolicyAssignment pa = new PolicyAssignment(TEST_POLICY_1, queueId); - return pa; - } - - private static PolicyDefinition createPolicy() { - PolicyDefinition def = new PolicyDefinition(); - def.setName(TEST_POLICY_1); - def.setInputStreams(Arrays.asList(TEST_STREAM_DEF_1)); - def.setOutputStreams(Arrays.asList(OUT_STREAM1)); - def.setParallelismHint(5); - def.setDefinition(new Definition()); - - streamGroup = new StreamGroup(); - par = new StreamPartition(); - par.setStreamId(TEST_STREAM_DEF_1); - par.getColumns().add(COL1); - StreamSortSpec sortSpec = new StreamSortSpec(); -// sortSpec.setColumn("col1"); - sortSpec.setWindowMargin(3); - sortSpec.setWindowPeriod("PT1M"); - - par.setSortSpec(sortSpec); - streamGroup.addStreamPartition(par); - - List<StreamPartition> lists = new ArrayList<StreamPartition>(); - lists.add(par); - def.setPartitionSpec(lists); - return def; - } - - private static StreamDefinition createStreamDefinition() { - StreamDefinition def = new StreamDefinition(); - def.setStreamId(TEST_STREAM_DEF_1); - def.setDataSource(TEST_DATASOURCE_1); - - StreamColumn col = new StreamColumn(); - col.setName(COL1); - col.setRequired(true); - col.setType(Type.STRING); - def.getColumns().add(col); - - return def; - } - - private static Publishment createPublishment() { - Publishment pub = new Publishment(); - pub.setType("KAFKA"); - pub.setName("test-stream-output"); - pub.setPolicyIds(Arrays.asList(TEST_POLICY_1)); - return pub; - } - - private static Kafka2TupleMetadata createKafka2TupleMetadata() { - Kafka2TupleMetadata ktm = new Kafka2TupleMetadata(); - ktm.setName(TEST_DATASOURCE_1); - ktm.setSchemeCls("SchemeClass"); - ktm.setTopic("tupleTopic"); - ktm.setType("KAFKA"); - ktm.setCodec(new Tuple2StreamMetadata()); - return ktm; - } - - private static Topology createSampleTopology() { - Topology t = new Topology(TOPO1, 3, 10); - for (int i = 0; i < t.getNumOfGroupBolt(); i++) { - t.getGroupNodeIds().add(t.getName() + "-grp-" + i); - } - for (int i = 0; i < t.getNumOfAlertBolt(); i++) { - t.getAlertBoltIds().add(t.getName() + "-alert-" + i); - } - return t; - } - -}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java deleted file mode 100644 index 1bfdd7b..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java +++ /dev/null @@ -1,724 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator; - -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.CollectionType; -import com.fasterxml.jackson.databind.type.SimpleType; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.alert.coordinator.mock.TestTopologyMgmtService; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.eagle.alert.coordination.model.*; -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.coordinator.IScheduleContext; -import org.apache.eagle.alert.coordinator.ScheduleOption; -import org.apache.eagle.alert.coordinator.impl.GreedyPolicyScheduler; -import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; -import org.apache.eagle.alert.coordinator.model.GroupBoltUsage; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext; -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition.Definition; -import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.*; - -/** - * @since Apr 22, 2016 - */ -public class SchedulerTest { - - private static final String TOPO2 = "topo2"; - private static final String TOPO1 = "topo1"; - private static final int PARALELLISM = 5; - private static final String STREAM2 = "stream2"; - private static final String JOIN_POLICY_1 = "join-policy-1"; - private static final String TEST_TOPIC = "test-topic"; - private static final String TEST_POLICY_1 = "test-policy1"; - private static final String TEST_POLICY_2 = "test-policy2"; - private static final String TEST_POLICY_3 = "test-policy3"; - private static final String STREAM1 = "stream1"; - private static final String DS_NAME = "ds1"; - private static ObjectMapper mapper = new ObjectMapper(); - private static final Logger LOG = LoggerFactory.getLogger(SchedulerTest.class); - - @BeforeClass - public static void setup() { - ConfigFactory.invalidateCaches(); - System.setProperty("config.resource", "/application.conf"); - } - - @Test - public void test01_simple() throws Exception { - GreedyPolicyScheduler ps = new GreedyPolicyScheduler(); - TestTopologyMgmtService mgmtService = createMgmtService(); - IScheduleContext context = createScheduleContext(mgmtService); - ps.init(context, mgmtService); - ps.schedule(new ScheduleOption()); - - ScheduleState status = ps.getState(); - context = ps.getContext(); // context updated! - Map<String, SpoutSpec> spec = status.getSpoutSpecs(); - - LOG.info(mapper.writeValueAsString(spec)); - Assert.assertEquals(2, spec.size()); - Assert.assertTrue(spec.containsKey(TOPO1)); - assertFirstPolicyScheduled(context, status); - } - - private void assertFirstPolicyScheduled(IScheduleContext context, ScheduleState status) { - String version = status.getVersion(); - // assert spout spec - { - Iterator<SpoutSpec> it = status.getSpoutSpecs().values().iterator(); - { - // assert spout 1 - SpoutSpec ss = it.next(); - Assert.assertEquals(version, ss.getVersion()); - Assert.assertEquals(1, ss.getKafka2TupleMetadataMap().size()); - Assert.assertEquals(TEST_TOPIC, ss.getKafka2TupleMetadataMap().keySet().iterator().next()); - - Assert.assertEquals(1, ss.getStreamRepartitionMetadataMap().size()); - List<StreamRepartitionMetadata> metas = ss.getStreamRepartitionMetadataMap().values().iterator().next(); - Assert.assertEquals(1, metas.size()); - - StreamRepartitionMetadata streamMeta = metas.iterator().next(); - Assert.assertEquals(STREAM1, streamMeta.getStreamId()); - Assert.assertEquals(TEST_TOPIC, streamMeta.getTopicName()); - Assert.assertEquals(1, streamMeta.groupingStrategies.size()); - - StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next(); - Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts); - Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size()); - Assert.assertEquals(0, gs.startSequence); - - Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - } - { - // assert spout 2 - SpoutSpec ss = it.next(); - Assert.assertEquals(version, ss.getVersion()); - Assert.assertEquals(0, ss.getKafka2TupleMetadataMap().size()); - } - } - // assert grp-by spec - { - Iterator<RouterSpec> gsit = status.getGroupSpecs().values().iterator(); - { - Assert.assertEquals(2, status.getGroupSpecs().values().size()); - Assert.assertTrue(gsit.hasNext()); - RouterSpec gspec = gsit.next(); - Assert.assertEquals(version, gspec.getVersion()); - String topo1 = gspec.getTopologyName(); - LOG.info("group spec topology name:", topo1); - List<StreamRouterSpec> routeSpecs = gspec.getRouterSpecs(); - Assert.assertEquals(1, routeSpecs.size()); - for (StreamRouterSpec spec : routeSpecs) { - StreamPartition par = spec.getPartition(); - Assert.assertEquals(STREAM1, par.getStreamId()); - Assert.assertEquals(Arrays.asList("col1"), par.getColumns()); - Assert.assertEquals(STREAM1, spec.getStreamId()); - - Assert.assertEquals(1, spec.getTargetQueue().size()); - List<PolicyWorkerQueue> queues = spec.getTargetQueue(); - Assert.assertEquals(1, queues.size()); - Assert.assertEquals(5, queues.get(0).getWorkers().size()); - for (WorkSlot slot : queues.get(0).getWorkers()) { - Assert.assertEquals(topo1, slot.getTopologyName()); - LOG.info(slot.getBoltId()); - } - } - } - // grp-spec2 - { - RouterSpec gs2 = gsit.next(); - Assert.assertEquals(version, gs2.getVersion()); - List<StreamRouterSpec> routeSpecs = gs2.getRouterSpecs(); - Assert.assertEquals(0, routeSpecs.size()); - } - } - // alert spec - { - Assert.assertEquals(2, status.getAlertSpecs().values().size()); - Iterator<AlertBoltSpec> asit = status.getAlertSpecs().values().iterator(); - // topo1 - { - AlertBoltSpec alertSpec = asit.next(); - Assert.assertEquals(version, alertSpec.getVersion()); - String topo1 = alertSpec.getTopologyName(); - LOG.info("alert spec topology name {}", topo1); - for (List<String> definitions : alertSpec.getBoltPolicyIdsMap().values()) { - Assert.assertEquals(1, definitions.size()); - Assert.assertEquals(TEST_POLICY_1, definitions.get(0)); - } - } - // topo2 - { - AlertBoltSpec alertSpec = asit.next(); - Assert.assertEquals(version, alertSpec.getVersion()); - String topo1 = alertSpec.getTopologyName(); - LOG.info("alert spec topology name {}", topo1); - Assert.assertEquals(0, alertSpec.getBoltPolicyIdsMap().size()); - } - } - } - - private TestTopologyMgmtService createMgmtService() { - TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(); - return mgmtService; - } - - private InMemScheduleConext createScheduleContext(TestTopologyMgmtService mgmtService) { - InMemScheduleConext context = new InMemScheduleConext(); - // topo - Pair<Topology, TopologyUsage> pair1 = mgmtService.createEmptyTopology(TOPO1); - Pair<Topology, TopologyUsage> pair2 = mgmtService.createEmptyTopology(TOPO2); - context.addTopology(pair1.getLeft()); - context.addTopologyUsages(pair1.getRight()); - context.addTopology(pair2.getLeft()); - context.addTopologyUsages(pair2.getRight()); - - // policy - createSamplePolicy(context, TEST_POLICY_1, STREAM1, PARALELLISM); - - // data source - Kafka2TupleMetadata ds = new Kafka2TupleMetadata(); - ds.setName(DS_NAME); - ds.setTopic(TEST_TOPIC); - ds.setCodec(new Tuple2StreamMetadata()); - context.addDataSource(ds); - - // schema - { - StreamDefinition schema = new StreamDefinition(); - { - StreamColumn c = new StreamColumn(); - c.setName("col1"); - c.setType(Type.STRING); - c.setDefaultValue("dflt"); - schema.getColumns().add(c); - } - { - StreamColumn c = new StreamColumn(); - c.setName("col2"); - c.setType(Type.DOUBLE); - c.setDefaultValue("0.0"); - schema.getColumns().add(c); - } - schema.setStreamId(STREAM1); - schema.setValidate(false); - schema.setDataSource(DS_NAME); - context.addSchema(schema); - } - { - StreamDefinition schema = new StreamDefinition(); - { - StreamColumn c = new StreamColumn(); - c.setName("col1"); - c.setType(Type.STRING); - c.setDefaultValue("dflt"); - schema.getColumns().add(c); - } - schema.setStreamId(STREAM2); - schema.setValidate(false); - schema.setDataSource(DS_NAME); - context.addSchema(schema); - } - - return context; - } - - /** - * Add policy after add policy - */ - @Test - public void test_schedule_add2() { - TestTopologyMgmtService mgmtService = createMgmtService(); - IScheduleContext context = createScheduleContext(mgmtService); - GreedyPolicyScheduler ps = new GreedyPolicyScheduler(); - ps.init(context, mgmtService); - - ScheduleOption option = new ScheduleOption(); - ps.schedule(option); - ScheduleState status = ps.getState(); - context = ps.getContext(); // context updated! - assertFirstPolicyScheduled(context, status); - - createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_2, STREAM1, PARALELLISM); - - ps.init(context, mgmtService); // reinit - ps.schedule(option); - status = ps.getState(); - context = ps.getContext(); // context updated! - // now assert two policy on the same queue - assertSecondPolicyCreated(context, status); - - // add one policy on different stream of the same topic - createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_3, STREAM2, PARALELLISM); - - ps.init(context, mgmtService); // re-init - ps.schedule(option); - status = ps.getState(); - context = ps.getContext(); // context updated! - assertThridPolicyScheduled(context, status); - } - - private void assertThridPolicyScheduled(IScheduleContext context, ScheduleState status) { - { - // now assert two policy on the same queue - Assert.assertEquals(2, status.getSpoutSpecs().values().size()); - Iterator<SpoutSpec> it = status.getSpoutSpecs().values().iterator(); - { - // assert spout 1 - SpoutSpec ss = it.next(); - Assert.assertEquals(1, ss.getKafka2TupleMetadataMap().size()); - Assert.assertEquals(TEST_TOPIC, ss.getKafka2TupleMetadataMap().keySet().iterator().next()); - - Assert.assertEquals(1, ss.getStreamRepartitionMetadataMap().size()); - List<StreamRepartitionMetadata> metas = ss.getStreamRepartitionMetadataMap().values().iterator().next(); - Assert.assertEquals(1, metas.size()); - - StreamRepartitionMetadata streamMeta = metas.iterator().next(); - Assert.assertEquals(STREAM1, streamMeta.getStreamId()); - Assert.assertEquals(TEST_TOPIC, streamMeta.getTopicName()); - Assert.assertEquals(1, streamMeta.groupingStrategies.size()); - - StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next(); - Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts); - Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size()); - Assert.assertEquals(0, gs.startSequence); - - PolicyAssignment pa1 = context.getPolicyAssignments().get(TEST_POLICY_1); - PolicyAssignment pa2 = context.getPolicyAssignments().get(TEST_POLICY_2); - Assert.assertNotNull(pa1); - Assert.assertNotNull(pa2); - Assert.assertEquals(pa1.getQueueId(), pa2.getQueueId()); - } - { - // assert spout 2 - SpoutSpec ss = it.next(); - Assert.assertEquals(1, ss.getKafka2TupleMetadataMap().size()); - - Assert.assertEquals(TEST_TOPIC, ss.getKafka2TupleMetadataMap().keySet().iterator().next()); - - Assert.assertEquals(1, ss.getStreamRepartitionMetadataMap().size()); - List<StreamRepartitionMetadata> metas = ss.getStreamRepartitionMetadataMap().values().iterator().next(); - Assert.assertEquals(1, metas.size()); - - StreamRepartitionMetadata streamMeta = metas.iterator().next(); - Assert.assertEquals(STREAM2, streamMeta.getStreamId()); - Assert.assertEquals(TEST_TOPIC, streamMeta.getTopicName()); - Assert.assertEquals(1, streamMeta.groupingStrategies.size()); - - StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next(); - Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts); - Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size()); - Assert.assertEquals(0, gs.startSequence); - - // assert policy assignment for the three policies - PolicyAssignment pa1 = context.getPolicyAssignments().get(TEST_POLICY_1); - PolicyAssignment pa2 = context.getPolicyAssignments().get(TEST_POLICY_2); - PolicyAssignment pa3 = context.getPolicyAssignments().get(TEST_POLICY_3); - Assert.assertNotNull(pa1); - Assert.assertNotNull(pa2); - Assert.assertNotNull(pa3); - Assert.assertEquals(pa1.getQueueId(), pa2.getQueueId()); - Assert.assertNotEquals(pa1.getQueueId(), pa3.getQueueId()); - StreamWorkSlotQueue queue1 = getQueue(context, pa1.getQueueId()).getRight(); - StreamWorkSlotQueue queue3 = getQueue(context, pa3.getQueueId()).getRight(); - Assert.assertNotEquals(queue1.getWorkingSlots().get(0).getTopologyName(), - queue3.getWorkingSlots().get(0).getTopologyName()); - } - } - // group spec - { - Iterator<RouterSpec> gsit = status.getGroupSpecs().values().iterator(); - Assert.assertEquals(2, status.getGroupSpecs().values().size()); - { - // first topology's grp - spec - gsit.next(); - // should be same with second policy scheduled, not assert here - } - { - // second topology's grp - spec - RouterSpec spec = gsit.next(); - Assert.assertEquals(1, spec.getRouterSpecs().size()); - StreamRouterSpec routeSpec = spec.getRouterSpecs().get(0); - Assert.assertEquals(STREAM2, routeSpec.getStreamId()); - Assert.assertEquals(Arrays.asList("col1"), routeSpec.getPartition().getColumns()); - } - } - // alert spec - { - Assert.assertEquals(2, status.getAlertSpecs().values().size()); - Iterator<AlertBoltSpec> asit = status.getAlertSpecs().values().iterator(); - { - // same to the two policy case, not assert here - asit.next(); - } - { - // seconds topology's alert spec - AlertBoltSpec as = asit.next(); - Assert.assertEquals(5, as.getBoltPolicyIdsMap().size()); - for (List<String> pdList : as.getBoltPolicyIdsMap().values()) { - Assert.assertEquals(1, pdList.size()); - Assert.assertEquals(TEST_POLICY_3, pdList.get(0)); - } - } - } - } - - private void assertSecondPolicyCreated(IScheduleContext context, ScheduleState status) { - String version = status.getVersion(); - { - // spout : assert two policy on the same topology (same worker - // queue) - Iterator<SpoutSpec> it = status.getSpoutSpecs().values().iterator(); - { - // assert spout 1 has two policy - SpoutSpec ss = it.next(); - Assert.assertEquals(1, ss.getKafka2TupleMetadataMap().size()); - Assert.assertEquals(TEST_TOPIC, ss.getKafka2TupleMetadataMap().keySet().iterator().next()); - - Assert.assertEquals(1, ss.getStreamRepartitionMetadataMap().size()); - List<StreamRepartitionMetadata> metas = ss.getStreamRepartitionMetadataMap().values().iterator().next(); - Assert.assertEquals(1, metas.size()); - - StreamRepartitionMetadata streamMeta = metas.iterator().next(); - Assert.assertEquals(STREAM1, streamMeta.getStreamId()); - Assert.assertEquals(TEST_TOPIC, streamMeta.getTopicName()); - Assert.assertEquals(1, streamMeta.groupingStrategies.size()); - - StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next(); - Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts); - Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size()); - Assert.assertEquals(0, gs.startSequence); - - // assert two policy on the same queue - PolicyAssignment pa1 = context.getPolicyAssignments().get(TEST_POLICY_1); - PolicyAssignment pa2 = context.getPolicyAssignments().get(TEST_POLICY_2); - Assert.assertNotNull(pa1); - Assert.assertNotNull(pa2); - Assert.assertEquals(pa1.getQueueId(), pa2.getQueueId()); - StreamWorkSlotQueue queue = getQueue(context, pa1.getQueueId()).getRight(); - Assert.assertNotNull(queue); - } - { - // assert spout 2 is still empty - SpoutSpec ss = it.next(); - Assert.assertEquals(0, ss.getKafka2TupleMetadataMap().size()); - } - } - - // assert grp-by spec. This is nothing different compare to first policy - { - Iterator<RouterSpec> gsit = status.getGroupSpecs().values().iterator(); - { - Assert.assertEquals(2, status.getGroupSpecs().values().size()); - Assert.assertTrue(gsit.hasNext()); - RouterSpec gspec = gsit.next(); - Assert.assertEquals(version, gspec.getVersion()); - String topo1 = gspec.getTopologyName(); - LOG.info("group spec topology name:", topo1); - List<StreamRouterSpec> routeSpecs = gspec.getRouterSpecs(); - Assert.assertEquals(1, routeSpecs.size()); - for (StreamRouterSpec spec : routeSpecs) { - StreamPartition par = spec.getPartition(); - Assert.assertEquals(STREAM1, par.getStreamId()); - Assert.assertEquals(Arrays.asList("col1"), par.getColumns()); - Assert.assertEquals(STREAM1, spec.getStreamId()); - - Assert.assertEquals(1, spec.getTargetQueue().size()); - List<PolicyWorkerQueue> queues = spec.getTargetQueue(); - Assert.assertEquals(1, queues.size()); - Assert.assertEquals(5, queues.get(0).getWorkers().size()); - for (WorkSlot slot : queues.get(0).getWorkers()) { - Assert.assertEquals(topo1, slot.getTopologyName()); - LOG.info(slot.getBoltId()); - } - } - } - // grp-spec for second topology is still empty - { - RouterSpec gs2 = gsit.next(); - Assert.assertEquals(version, gs2.getVersion()); - List<StreamRouterSpec> routeSpecs = gs2.getRouterSpecs(); - Assert.assertEquals(0, routeSpecs.size()); - } - } - // alert spec - { - Assert.assertEquals(2, status.getAlertSpecs().values().size()); - Iterator<AlertBoltSpec> asit = status.getAlertSpecs().values().iterator(); - { - AlertBoltSpec alertSpec = asit.next(); - Assert.assertEquals(version, alertSpec.getVersion()); - String topo1 = alertSpec.getTopologyName(); - LOG.info("alert spec topology name {}", topo1); - for (List<String> definitions : alertSpec.getBoltPolicyIdsMap().values()) { - Assert.assertEquals(2, definitions.size()); - // List<String> names = Arrays.asList(definitions.stream().map((t) -> - // t.getName()).toArray(String[]::new)); - Assert.assertTrue(definitions.contains(TEST_POLICY_1)); - Assert.assertTrue(definitions.contains(TEST_POLICY_2)); - } - } - // second spout - { - AlertBoltSpec spec = asit.next(); - Assert.assertEquals(0, spec.getBoltPolicyIdsMap().size()); - } - } - } - - public static Pair<MonitoredStream, StreamWorkSlotQueue> getQueue(IScheduleContext context, String queueId) { - for (MonitoredStream ms : context.getMonitoredStreams().values()) { - for (StreamWorkSlotQueue q : ms.getQueues()) { - if (q.getQueueId().equals(queueId)) { - return Pair.of(ms, q); - } - } - } - return null; - } - - @Test - public void testGroupEquals() { - StreamRepartitionStrategy gs1 = new StreamRepartitionStrategy(); - StreamPartition sp = new StreamPartition(); - sp.setColumns(Arrays.asList("col1")); - sp.setSortSpec(new StreamSortSpec()); - sp.setStreamId("testStream"); - sp.setType(StreamPartition.Type.GROUPBY); - gs1.partition = sp; - - StreamRepartitionStrategy gs2 = new StreamRepartitionStrategy(); - sp = new StreamPartition(); - sp.setColumns(Arrays.asList("col1")); - sp.setSortSpec(new StreamSortSpec()); - sp.setStreamId("testStream"); - sp.setType(StreamPartition.Type.GROUPBY); - gs2.partition = sp; - - Assert.assertTrue(gs1.equals(gs2)); - List<StreamRepartitionStrategy> list = new ArrayList<StreamRepartitionStrategy>(); - list.add(gs1); - Assert.assertTrue(list.contains(gs2)); - } - - private void createSamplePolicy(InMemScheduleConext context, String policyName, String stream, int hint) { - PolicyDefinition pd = new PolicyDefinition(); - pd.setParallelismHint(hint); - Definition def = new Definition(); - pd.setDefinition(def); - pd.setName(policyName); - pd.setInputStreams(Arrays.asList(stream)); - pd.setOutputStreams(Arrays.asList("outputStream2")); - StreamPartition par = new StreamPartition(); - par.setColumns(Arrays.asList("col1")); - par.setType(StreamPartition.Type.GLOBAL); - par.setStreamId(stream); - pd.setPartitionSpec(Arrays.asList(par)); - context.addPoilcy(pd); - } - - /** - * Add and remove - */ - @Test - public void test_schedule2_remove() { - // TODO - } - - @Test - public void test_schedule_updateParitition() { - // This case design test is move to outter logic of ScheduleConetxtBuilder - } - - @Test - public void test_schedule_updateDefinition() { - // This case design test is move to outter logic of ScheduleConetxtBuilder - } - - @Test - public void test_schedule_nogroupby() { - // TODO - } - - @SuppressWarnings("unused") - @Test - public void test_schedule_multipleStream() throws Exception { - TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(); - IScheduleContext context = createScheduleContext(mgmtService); - GreedyPolicyScheduler ps = new GreedyPolicyScheduler(); - - createJoinPolicy((InMemScheduleConext) context, JOIN_POLICY_1, Arrays.asList(STREAM1, STREAM2)); - - ps.init(context, mgmtService); - ScheduleOption option = new ScheduleOption(); - ps.schedule(option); - ScheduleState state = ps.getState(); - - context = ps.getContext(); // context updated! - // assert - Assert.assertTrue(context.getPolicyAssignments().containsKey(JOIN_POLICY_1)); - Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - PolicyAssignment pa1 = context.getPolicyAssignments().get(JOIN_POLICY_1); - PolicyAssignment pa2 = context.getPolicyAssignments().get(TEST_POLICY_1); - Assert.assertNotEquals(pa1.getQueueId(), pa2.getQueueId()); - - StreamWorkSlotQueue joinPair = getQueue(context, pa1.getQueueId()).getRight(); - String joinTopo = joinPair.getWorkingSlots().get(0).topologyName; - StreamWorkSlotQueue streamPair = getQueue(context, pa2.getQueueId()).getRight(); - String streamTopo = streamPair.getWorkingSlots().get(0).topologyName; - Assert.assertNotEquals(joinTopo, streamTopo); - - // TODO more assert on state - SpoutSpec joinSpout = state.getSpoutSpecs().get(joinTopo); - RouterSpec groupSpec = state.getGroupSpecs().get(joinTopo); - AlertBoltSpec alertSpec = state.getAlertSpecs().get(joinTopo); - - Assert.assertEquals(1, joinSpout.getStreamRepartitionMetadataMap().size()); - Assert.assertEquals(2, joinSpout.getStreamRepartitionMetadataMap().get(TEST_TOPIC).size()); - - Assert.assertEquals(2, groupSpec.getRouterSpecs().size()); - - LOG.info(new ObjectMapper().writeValueAsString(state)); - } - - private void createJoinPolicy(InMemScheduleConext context, String policyName, List<String> asList) { - PolicyDefinition pd = new PolicyDefinition(); - pd.setParallelismHint(5); - Definition def = new Definition(); - pd.setDefinition(def); - pd.setName(policyName); - pd.setInputStreams(asList); - pd.setOutputStreams(Arrays.asList("outputStream2")); - for (String streamId : pd.getInputStreams()) { - StreamPartition par = new StreamPartition(); - par.setColumns(Arrays.asList("col1")); - par.setType(StreamPartition.Type.GROUPBY); - par.setStreamId(streamId); - pd.addPartition(par); - } - context.addPoilcy(pd); - } - - @Test - public void testIrregularPolicyParallelismHint() { - Config config = ConfigFactory.load(); - int defaultParallelism = config.getInt("coordinator.policyDefaultParallelism"); - TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(5, 12); - InMemScheduleConext context = createScheduleContext(mgmtService); - // recreate test poicy - context.getPolicies().clear(); - // make the hint bigger than bolt number - int irregularParallelism = defaultParallelism + 2; - createSamplePolicy(context, "irregularPolicy", STREAM1, irregularParallelism); - GreedyPolicyScheduler ps = new GreedyPolicyScheduler(); - - ps.init(context, mgmtService); - - ScheduleState scheduled = ps.schedule(new ScheduleOption()); - Assert.assertEquals(2, scheduled.getSpoutSpecs().size()); - Assert.assertEquals(2, scheduled.getGroupSpecs().size()); - Assert.assertEquals(2, scheduled.getAlertSpecs().size()); - // assertion - RouterSpec spec = scheduled.getGroupSpecs().get(TOPO1); - Assert.assertTrue(spec.getRouterSpecs().size() > 0); // must be allocated - for (StreamRouterSpec routerSpec : spec.getRouterSpecs()) { - Assert.assertEquals(1, routerSpec.getTargetQueue().size()); - // irregularParallelism is prompted to 2 * defaultParallelism = 10 - Assert.assertEquals(10, routerSpec.getTargetQueue().get(0).getWorkers().size()); - } - } - - @Test - public void testDataSources() throws Exception { - InMemScheduleConext context = loadContext("/multi/"); - TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(4, 10); - - GreedyPolicyScheduler ps = new GreedyPolicyScheduler(); - ps.init(context, mgmtService); - - ScheduleState state = ps.schedule(new ScheduleOption()); - Assert.assertNotNull(state); - Assert.assertEquals(2, state.getAssignments().size()); - Assert.assertEquals(1, state.getAlertSpecs().size()); - Assert.assertEquals(10, state.getAlertSpecs().get("alertUnitTopology_1").getBoltPolicyIdsMap().size()); - } - - private InMemScheduleConext loadContext(String base) throws Exception { - InMemScheduleConext context = new InMemScheduleConext(); - - List<Kafka2TupleMetadata> metadata = loadEntities(base + "datasources.json", Kafka2TupleMetadata.class); - for (Kafka2TupleMetadata k : metadata) { - context.addDataSource(k); - } - - List<PolicyDefinition> policies = loadEntities(base + "policies.json", PolicyDefinition.class); - for (PolicyDefinition p : policies) { - context.addPoilcy(p); - } - - List<Publishment> pubs = loadEntities(base + "publishments.json", Publishment.class); - for (Publishment pub : pubs) { - context.addPublishment(pub); - } - - List<StreamDefinition> defs = loadEntities(base + "streamdefinitions.json", StreamDefinition.class); - for (StreamDefinition def : defs) { - context.addSchema(def); - } - - List<Topology> topos = loadEntities(base + "topologies.json", Topology.class); - for (Topology t : topos) { - context.addTopology(t); - - TopologyUsage u = new TopologyUsage(t.getName()); - for (String gnid : t.getGroupNodeIds()) { - u.getGroupUsages().put(gnid, new GroupBoltUsage(gnid)); - } - for (String anid : t.getAlertBoltIds()) { - u.getAlertUsages().put(anid, new AlertBoltUsage(anid)); - } - context.addTopologyUsages(u); - } - - return context; - } - - public static <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception { - System.out.println(FileUtils.readFileToString(new File(SchedulerTest.class.getResource(path).getPath()))); - JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz)); - List<T> l = mapper.readValue(SchedulerTest.class.getResourceAsStream(path), type); - return l; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java deleted file mode 100644 index 1f3baf5..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator; - -import com.google.common.base.Joiner; -import org.apache.eagle.alert.config.ZKConfig; -import org.apache.eagle.alert.coordinator.ExclusiveExecutor; -import org.apache.eagle.alert.utils.ZookeeperEmbedded; -import org.junit.*; - -import java.io.BufferedReader; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -@Ignore -public class TestExclusiveExecutor { - - ZookeeperEmbedded zkEmbed; - - @Before - public void setUp() throws Exception { - zkEmbed = new ZookeeperEmbedded(2181); - zkEmbed.start(); - - Thread.sleep(2000); - } - - @After - public void tearDown() throws Exception { - zkEmbed.shutdown(); - } - - @Test - public void testConcurrency() throws Exception { - ByteArrayOutputStream newStreamOutput = new ByteArrayOutputStream(); - PrintStream newStream = new PrintStream(newStreamOutput); - PrintStream oldStream = System.out; - - System.setOut(newStream); - - ZKConfig zkConfig = new ZKConfig(); - zkConfig.zkQuorum = "127.0.0.1:2181"; - zkConfig.zkRetryTimes = 3; - zkConfig.zkRoot = "/"; - zkConfig.connectionTimeoutMs = 3000; - zkConfig.zkRetryInterval = 1000; - zkConfig.zkSessionTimeoutMs = 5000; - - String path = "/concurrenty"; - AtomicBoolean lock1 = new AtomicBoolean(false); - Runnable runnableOne = () -> { System.out.println("this is thread one"); lock1.set(true);}; - new Thread(() -> { - ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig); - try { - executor.execute(path, runnableOne); - } catch (TimeoutException e) { - } - }).start(); - - - AtomicBoolean lock2 = new AtomicBoolean(); - Runnable runnableTwo = () -> { System.out.println("this is thread two"); lock2.set(true);}; - new Thread(() -> { - ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig); - try { - executor.execute(path, runnableTwo); - } catch (TimeoutException e) { - } - }).start(); - - Thread.sleep(2000); - - System.out.flush(); - BufferedReader br = new BufferedReader(new StringReader(newStreamOutput.toString())); - List<String> logs = new ArrayList<String>(); - String line = null; - while ((line = br.readLine()) != null) { - logs.add(line); - } - - System.setOut(oldStream); - System.out.println("Cached logs: " + Joiner.on("\n").join(logs)); - - Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is thread one"))); - Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is thread two"))); - - Assert.assertTrue(lock1.get()); - Assert.assertTrue(lock2.get()); - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java deleted file mode 100644 index 875bb81..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.config.ZKConfig; -import org.apache.eagle.alert.config.ZKConfigBuilder; -import org.apache.eagle.alert.coordinator.ExclusiveExecutor; -import org.apache.eagle.alert.utils.ZookeeperEmbedded; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - -public class TestGreedyScheduleCoordinator { - - public static class GreedyScheduleCoordinator { - - public int schedule(int input) throws TimeoutException { - Config config = ConfigFactory.load().getConfig("coordinator"); - ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig); - final AtomicInteger r = new AtomicInteger(); - executor.execute("/alert/test", () -> { - try { - Thread.sleep(input); - } catch (Exception e){ - } - - r.set(input); - }); - try { - executor.close(); - } catch (IOException e) { - e.printStackTrace(); - } - throw new RuntimeException("Acquire greedy scheduler lock failed, please retry later"); - } - } - - ZookeeperEmbedded zkEmbed; - - @Before - public void setUp() throws Exception { - zkEmbed = new ZookeeperEmbedded(2181); - zkEmbed.start(); - - Thread.sleep(2000); - } - - @After - public void tearDown() throws Exception { - zkEmbed.shutdown(); - } - - @Test - public void testMain() throws Exception { - final GreedyScheduleCoordinator coordinator = new GreedyScheduleCoordinator(); - - - new Thread(new Runnable() { - - @Override - public void run() { - try { - System.out.println("output: " + coordinator.schedule(1)); - } catch (TimeoutException e) { - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - } - - }).start(); - - new Thread(new Runnable() { - - @Override - public void run() { - try { - System.out.println("output: " + coordinator.schedule(2)); - } catch (TimeoutException e) { - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - } - - }).start(); - - new Thread(new Runnable() { - - @Override - public void run() { - try { - System.out.println("output: " + coordinator.schedule(3)); - } catch (TimeoutException e) { - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - } - - }).start(); - - Thread.sleep(15000); - } - - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestMetadataValidator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestMetadataValidator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestMetadataValidator.java deleted file mode 100644 index c9d3b5e..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestMetadataValidator.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.eagle.alert.coordinator.impl.MetadataValdiator; -import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext; -import org.junit.Test; - -/** - * Created on 10/2/16. - */ -public class TestMetadataValidator { - - private static final ObjectMapper om = new ObjectMapper(); - - @Test - public void validate() throws Exception { - InMemScheduleConext context = new InMemScheduleConext(); - MetadataValdiator mv = new MetadataValdiator(context); - - - // om.readValue(TestMetadataValidator.class.getResourceAsStream("/validation/datasources.json"), new Gene); - // TODO add more test here. - - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java deleted file mode 100644 index 56ee980..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.alert.coordinator.mock.TestTopologyMgmtService; -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.StreamGroup; -import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; -import org.apache.eagle.alert.coordinator.impl.WorkQueueBuilder; -import org.apache.eagle.alert.coordinator.impl.strategies.SameTopologySlotStrategy; -import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.ConfigFactory; - -/** - * @since Apr 27, 2016 - */ -public class WorkSlotStrategyTest { - - private static final Logger LOG = LoggerFactory.getLogger(WorkSlotStrategyTest.class); - - @Test - public void test() { - InMemScheduleConext context = new InMemScheduleConext(); - - StreamPartition partition = new StreamPartition(); - partition.setType(StreamPartition.Type.GLOBAL); - partition.setStreamId("s1"); - partition.setColumns(Arrays.asList("f1", "f2")); - - StreamGroup group = new StreamGroup(); - group.addStreamPartition(partition); - - - { - TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(3, 3, "prefix-time1", true); - SameTopologySlotStrategy strategy = new SameTopologySlotStrategy(context, group, mgmtService); - List<WorkSlot> slots = strategy.reserveWorkSlots(5, false, new HashMap<String, Object>()); - Assert.assertEquals(0, slots.size()); - Assert.assertEquals(1, context.getTopologies().size()); - } - - { - TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(5, 5, "prefix-time2", true); - SameTopologySlotStrategy strategy = new SameTopologySlotStrategy(context, group, mgmtService); - List<WorkSlot> slots = strategy.reserveWorkSlots(5, false, new HashMap<String, Object>()); - Assert.assertEquals(5, slots.size()); - LOG.info(slots.get(0).getTopologyName()); - Assert.assertEquals(2, context.getTopologies().size()); - Assert.assertEquals(2, context.getTopologyUsages().size()); - - // assert all on same topology - for (WorkSlot ws : slots) { - Assert.assertEquals(slots.get(0).getTopologyName(), ws.getTopologyName()); - } - Iterator<TopologyUsage> it = context.getTopologyUsages().values().iterator(); - TopologyUsage usage = it.next(); - for (AlertBoltUsage u : usage.getAlertUsages().values()) { - Assert.assertTrue(u.getPartitions().size() == 0); - Assert.assertTrue(u.getQueueSize() == 0); - } - // assert - usage = it.next(); - for (AlertBoltUsage u : usage.getAlertUsages().values()) { - LOG.info(u.getBoltId()); - Assert.assertTrue(u.getPartitions().size() == 0); - Assert.assertTrue(u.getBoltId(), u.getQueueSize() == 0); - } - } - } - - @SuppressWarnings("unused") - @Test - public void test2_overlap() { - InMemScheduleConext context = new InMemScheduleConext(); - - StreamPartition partition = new StreamPartition(); - partition.setType(StreamPartition.Type.GLOBAL); - partition.setStreamId("s1"); - partition.setColumns(Arrays.asList("f1", "f2")); - StreamGroup sg = new StreamGroup(); - sg.addStreamPartition(partition, false); - - MonitoredStream ms1 = new MonitoredStream(sg); - - TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(5, 5, "prefix-3", true); - - String topo1 = null; - String bolt1 = null; - WorkQueueBuilder wrb = new WorkQueueBuilder(context, mgmtService); - StreamWorkSlotQueue queue = wrb.createQueue(ms1, false, 5, new HashMap<String, Object>()); - { - Assert.assertEquals(5, queue.getWorkingSlots().size()); - topo1 = queue.getWorkingSlots().get(0).getTopologyName(); - bolt1 = queue.getWorkingSlots().get(0).getBoltId(); - Assert.assertEquals(1, context.getTopologies().size()); - Assert.assertEquals(1, context.getTopologyUsages().size()); - LOG.info(queue.getWorkingSlots().get(0).getTopologyName()); - for (WorkSlot ws : queue.getWorkingSlots()) { - Assert.assertEquals(topo1, ws.getTopologyName()); - } - - TopologyUsage usage = context.getTopologyUsages().values().iterator().next(); - for (AlertBoltUsage u : usage.getAlertUsages().values()) { - Assert.assertTrue(u.getPartitions().size() > 0); - Assert.assertTrue(u.getBoltId(), u.getQueueSize() > 0); - } - } - - // second partition - StreamPartition partition2 = new StreamPartition(); - partition2.setType(StreamPartition.Type.GLOBAL); - partition2.setStreamId("s2"); - partition2.setColumns(Arrays.asList("f1", "f2")); - - StreamGroup sg2 = new StreamGroup(); - sg2.addStreamPartition(partition2); - MonitoredStream ms2 = new MonitoredStream(sg2); - queue = wrb.createQueue(ms2, false, 5, new HashMap<String, Object>()); - { - Assert.assertEquals(5, queue.getWorkingSlots().size()); - Assert.assertEquals(2, context.getTopologies().size()); - Assert.assertEquals(2, context.getTopologyUsages().size()); - - String topo2 = queue.getWorkingSlots().get(0).getTopologyName(); - String bolt2 = queue.getWorkingSlots().get(0).getBoltId(); - for (WorkSlot ws : queue.getWorkingSlots()) { - Assert.assertEquals(topo2, ws.getTopologyName()); - } - Assert.assertNotEquals(topo1, topo2); - } - } - - @Test - public void testMultipleStreams() { - ConfigFactory.invalidateCaches(); - System.setProperty("config.resource", "/application-multiplestreams.conf"); - - InMemScheduleConext context = new InMemScheduleConext(); - - StreamGroup group1 = createStreamGroup("s1", Arrays.asList("f1", "f2"), true); - StreamGroup group2 = createStreamGroup("s2", Arrays.asList("f2", "f3"), false); - StreamGroup group3 = createStreamGroup("s3", Arrays.asList("f4"), false); - StreamGroup group4 = createStreamGroup("s4", Arrays.asList("f5"), false); - - TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(3, 4, "prefix-time1", true); - WorkQueueBuilder wrb = new WorkQueueBuilder(context, mgmtService); - { - StreamWorkSlotQueue queue = wrb.createQueue(new MonitoredStream(group1), group1.isDedicated(), 2, new HashMap<String, Object>()); - print(context.getTopologyUsages().values()); - - TopologyUsage usage = context.getTopologyUsages().values().iterator().next(); - - Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group1)); - Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().size()); - Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().get(0).getWorkingSlots().size()); - - List<String> group1Slots = new ArrayList<String>(); - getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().get(0).getWorkingSlots().forEach(slot -> { - group1Slots.add(slot.getBoltId()); - }); - - StreamWorkSlotQueue queue2 = wrb.createQueue(new MonitoredStream(group2), group2.isDedicated(), 2, new HashMap<String, Object>()); - print(context.getTopologyUsages().values()); - - Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group2)); - Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().size()); - Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().get(0).getWorkingSlots().size()); - getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().get(0).getWorkingSlots().forEach(slot -> { - Assert.assertTrue(!group1Slots.contains(slot.getBoltId())); - }); - - - StreamWorkSlotQueue queue3 = wrb.createQueue(new MonitoredStream(group3), group3.isDedicated(), 2, new HashMap<String, Object>()); - print(context.getTopologyUsages().values()); - - Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group3)); - Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group3).getQueues().size()); - Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group3).getQueues().get(0).getWorkingSlots().size()); - getMonitorStream(usage.getMonitoredStream()).get(group3).getQueues().get(0).getWorkingSlots().forEach(slot -> { - Assert.assertTrue(!group1Slots.contains(slot.getBoltId())); - }); - - StreamWorkSlotQueue queue4 = wrb.createQueue(new MonitoredStream(group4), group4.isDedicated(), 2, new HashMap<String, Object>()); - print(context.getTopologyUsages().values()); - - Assert.assertTrue(!getMonitorStream(usage.getMonitoredStream()).containsKey(group4)); - - } - } - - @Test - public void testMultipleStreamsWithoutReuse() { - ConfigFactory.invalidateCaches(); - System.setProperty("config.resource", "/application-multiplestreams2.conf"); - - InMemScheduleConext context = new InMemScheduleConext(); - - StreamGroup group1 = createStreamGroup("s1", Arrays.asList("f1", "f2"), true); - StreamGroup group2 = createStreamGroup("s2", Arrays.asList("f2", "f3"), false); - StreamGroup group3 = createStreamGroup("s3", Arrays.asList("f4"), false); - StreamGroup group4 = createStreamGroup("s4", Arrays.asList("f5"), false); - - TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(3, 4, "prefix-time1", true); - WorkQueueBuilder wrb = new WorkQueueBuilder(context, mgmtService); - { - StreamWorkSlotQueue queue = wrb.createQueue(new MonitoredStream(group1), group1.isDedicated(), 2, new HashMap<String, Object>()); - print(context.getTopologyUsages().values()); - - TopologyUsage usage = context.getTopologyUsages().values().iterator().next(); - - Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group1)); - Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().size()); - Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().get(0).getWorkingSlots().size()); - - List<String> group1Slots = new ArrayList<String>(); - getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().get(0).getWorkingSlots().forEach(slot -> { - group1Slots.add(slot.getBoltId()); - }); - - StreamWorkSlotQueue queue2 = wrb.createQueue(new MonitoredStream(group2), group2.isDedicated(), 2, new HashMap<String, Object>()); - print(context.getTopologyUsages().values()); - - Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group2)); - Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().size()); - Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().get(0).getWorkingSlots().size()); - getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().get(0).getWorkingSlots().forEach(slot -> { - Assert.assertTrue(!group1Slots.contains(slot.getBoltId())); - }); - - - StreamWorkSlotQueue queue3 = wrb.createQueue(new MonitoredStream(group3), group3.isDedicated(), 2, new HashMap<String, Object>()); - print(context.getTopologyUsages().values()); - - Assert.assertTrue(!getMonitorStream(usage.getMonitoredStream()).containsKey(group3)); - - StreamWorkSlotQueue queue4 = wrb.createQueue(new MonitoredStream(group4), group4.isDedicated(), 2, new HashMap<String, Object>()); - print(context.getTopologyUsages().values()); - - Assert.assertTrue(!getMonitorStream(usage.getMonitoredStream()).containsKey(group4)); - - } - } - - private Map<StreamGroup, MonitoredStream> getMonitorStream(List<MonitoredStream> monitorStreams) { - Map<StreamGroup, MonitoredStream> result = new HashMap<StreamGroup, MonitoredStream>(); - monitorStreams.forEach(monitorStream -> { - result.put(monitorStream.getStreamGroup(), monitorStream); - }); - return result; - } - - private StreamGroup createStreamGroup(String streamId, List<String> columns, boolean dedicated) { - StreamPartition partition = new StreamPartition(); - partition.setType(StreamPartition.Type.GLOBAL); - partition.setStreamId(streamId); - partition.setColumns(columns); - - StreamGroup group = new StreamGroup(); - group.addStreamPartition(partition, dedicated); - return group; - } - - private void print(Collection<TopologyUsage> usages) { - try { - ObjectMapper om = new ObjectMapper(); - LOG.info(">>>" + om.writeValueAsString(usages)); - } catch (Exception e) {} - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java deleted file mode 100644 index 826cde4..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator.mock; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.SortedMap; -import java.util.TreeMap; - -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamingCluster; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.service.IMetadataServiceClient; - -/** - * According to metadata servic client semantic, change to the interface returned value should not direclty change the - * states. - * - * @since May 5, 2016 - */ -@SuppressWarnings("serial") -public class InMemMetadataServiceClient implements IMetadataServiceClient { - - private List<StreamingCluster> clusters = new ArrayList<StreamingCluster>(); - private List<Topology> topologies = new ArrayList<Topology>(); - private List<PolicyDefinition> policies = new ArrayList<PolicyDefinition>(); - private List<StreamDefinition> definitions = new ArrayList<StreamDefinition>(); - private List<Kafka2TupleMetadata> datasources = new ArrayList<Kafka2TupleMetadata>(); - - private SortedMap<String, ScheduleState> scheduleStates = new TreeMap<String, ScheduleState>(); - private List<SpoutSpec> spoutSpecs = new ArrayList<SpoutSpec>(); - private List<Publishment> publishmetns = new ArrayList<Publishment>(); - private List<AlertPublishEvent> alerts = new ArrayList<>(); - - @Override - public void close() throws IOException { - } - - @Override - public List<StreamingCluster> listClusters() { - return Collections.unmodifiableList(clusters); - } - - @Override - public List<Topology> listTopologies() { - return Collections.unmodifiableList(topologies); - } - - @Override - public List<PolicyDefinition> listPolicies() { - return Collections.unmodifiableList(policies); - } - - public void removePolicy(int idx) { - policies.remove(idx); - } - - @Override - public List<StreamDefinition> listStreams() { - return Collections.unmodifiableList(definitions); - } - - @Override - public List<Kafka2TupleMetadata> listDataSources() { - return Collections.unmodifiableList(datasources); - } - - @Override - public List<Publishment> listPublishment() { - return Collections.unmodifiableList(publishmetns); - } - - @Override - public List<SpoutSpec> listSpoutMetadata() { - return Collections.unmodifiableList(spoutSpecs); - } - - @Override - public ScheduleState getVersionedSpec() { - Iterator<Entry<String, ScheduleState>> it = scheduleStates.entrySet().iterator(); - if (it.hasNext()) { - return it.next().getValue(); - } - return null; - } - - @Override - public ScheduleState getVersionedSpec(String version) { - return scheduleStates.get(version); - } - - @Override - public void addScheduleState(ScheduleState state) { - scheduleStates.put(state.getVersion(), state); - } - - @Override - public void addStreamingCluster(StreamingCluster cluster) { - clusters.add(cluster); - } - - @Override - public void addStreamingClusters(List<StreamingCluster> clusters) { - this.clusters.addAll(clusters); - } - - @Override - public void addTopology(Topology t) { - topologies.add(t); - } - - @Override - public void addTopologies(List<Topology> topologies) { - this.topologies.addAll(topologies); - } - - @Override - public void addPolicy(PolicyDefinition policy) { - policies.add(policy); - } - - @Override - public void addPolicies(List<PolicyDefinition> policies) { - this.policies.addAll(policies); - } - - @Override - public void addStreamDefinition(StreamDefinition streamDef) { - definitions.add(streamDef); - } - - @Override - public void addStreamDefinitions(List<StreamDefinition> streamDefs) { - this.definitions.addAll(streamDefs); - } - - @Override - public void addDataSource(Kafka2TupleMetadata k2t) { - datasources.add(k2t); - } - - @Override - public void addDataSources(List<Kafka2TupleMetadata> k2ts) { - this.datasources.addAll(k2ts); - } - - @Override - public void addPublishment(Publishment pub) { - publishmetns.add(pub); - } - - @Override - public void addPublishments(List<Publishment> pubs) { - this.publishmetns.addAll(pubs); - } - - @Override - public void clear() { - // do nothing - } - - @Override - public void clearScheduleState(int maxCapacity) { - - } - - @Override - public List<AlertPublishEvent> listAlertPublishEvent() { - return this.alerts; - } - - @Override - public void addAlertPublishEvent(AlertPublishEvent event) { - this.alerts.add(event); - } - - @Override - public void addAlertPublishEvents(List<AlertPublishEvent> events) { - this.alerts.addAll(events); - } - -}
