http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java deleted file mode 100644 index 688e321..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java +++ /dev/null @@ -1,449 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.metric; - -import com.codahale.metrics.*; -import com.codahale.metrics.jvm.FileDescriptorRatioGauge; -import com.codahale.metrics.jvm.MemoryUsageGaugeSet; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import kafka.admin.AdminUtils; -import kafka.api.FetchRequest; -import kafka.api.FetchRequestBuilder; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetRequest; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.message.MessageAndOffset; -import kafka.utils.ZKStringSerializer$; -import org.I0Itec.zkclient.ZkClient; -import org.apache.commons.io.FileUtils; -import org.apache.eagle.alert.metric.entity.MetricEvent; -import org.apache.eagle.alert.metric.sink.ConsoleSink; -import org.apache.eagle.alert.metric.sink.Slf4jSink; -import org.apache.eagle.alert.metric.source.JVMMetricSource; -import org.apache.eagle.alert.metric.source.MetricSource; -import org.apache.eagle.alert.utils.JsonUtils; -import org.apache.eagle.alert.utils.KafkaEmbedded; -import org.junit.*; -import org.junit.rules.TemporaryFolder; -import org.mockito.Mockito; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.Scanner; - -public class MetricSystemTest { - - public static final String END_LINE = System.getProperty("line.separator"); - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - private static final int DATA_BEGIN_INDEX = 55; - private static final String TOPIC = "alert_metric_test"; - private static final String clientName = "test"; - - @Test - public void testMetricEvent() { - MetricEvent metricEvent = MetricEvent.of("test").build(); - Assert.assertEquals(metricEvent.get("name"), "test"); - Assert.assertNotNull(metricEvent.get("timestamp")); - - metricEvent = MetricEvent.of("test1").build(); - metricEvent.put("timestamp", 1); - Assert.assertEquals(metricEvent.get("name"), "test1"); - Assert.assertEquals(metricEvent.get("timestamp"), 1); - - Counter counter = new Counter(); - counter.inc(10); - metricEvent = MetricEvent.of("testcount").from(counter).build(); - Assert.assertEquals(metricEvent.get("count"), 10l); - - Gauge gauge = Mockito.mock(FileDescriptorRatioGauge.class); - Mockito.when(gauge.getValue()).thenReturn(new Double("0.4")); - metricEvent = MetricEvent.of("testGauge").from(gauge).build(); - Assert.assertEquals(metricEvent.get("value"), 0.4); - - //Histogram - Histogram histogram = Mockito.mock(Histogram.class); - Snapshot snapshot = Mockito.mock(Snapshot.class); - Mockito.when(histogram.getCount()).thenReturn(11l); - Mockito.when(histogram.getSnapshot()).thenReturn(snapshot); - Mockito.when(snapshot.getMin()).thenReturn(1l); - Mockito.when(snapshot.getMax()).thenReturn(2l); - Mockito.when(snapshot.getMean()).thenReturn(3d); - Mockito.when(snapshot.getStdDev()).thenReturn(4d); - Mockito.when(snapshot.getMedian()).thenReturn(5d); - Mockito.when(snapshot.get75thPercentile()).thenReturn(6d); - Mockito.when(snapshot.get95thPercentile()).thenReturn(7d); - Mockito.when(snapshot.get98thPercentile()).thenReturn(8d); - Mockito.when(snapshot.get99thPercentile()).thenReturn(9d); - Mockito.when(snapshot.get999thPercentile()).thenReturn(10d); - metricEvent = MetricEvent.of("testHistogram").from(histogram).build(); - - Assert.assertEquals(metricEvent.get("count"), 11l); - Assert.assertEquals(metricEvent.get("min"), 1l); - Assert.assertEquals(metricEvent.get("max"), 2l); - Assert.assertEquals(metricEvent.get("mean"), 3d); - Assert.assertEquals(metricEvent.get("stddev"), 4d); - Assert.assertEquals(metricEvent.get("median"), 5d); - Assert.assertEquals(metricEvent.get("75thPercentile"), 6d); - Assert.assertEquals(metricEvent.get("95thPercentile"), 7d); - Assert.assertEquals(metricEvent.get("98thPercentile"), 8d); - Assert.assertEquals(metricEvent.get("99thPercentile"), 9d); - Assert.assertEquals(metricEvent.get("999thPercentile"), 10d); - - //Meter - Meter meter = Mockito.mock(Meter.class); - Mockito.when(meter.getCount()).thenReturn(1l); - Mockito.when(meter.getOneMinuteRate()).thenReturn(2d); - Mockito.when(meter.getFiveMinuteRate()).thenReturn(3d); - Mockito.when(meter.getFifteenMinuteRate()).thenReturn(4d); - Mockito.when(meter.getMeanRate()).thenReturn(5d); - metricEvent = MetricEvent.of("testMeter").from(meter).build(); - - Assert.assertEquals(metricEvent.get("value"), 1l); - Assert.assertEquals(metricEvent.get("1MinRate"), 2d); - Assert.assertEquals(metricEvent.get("5MinRate"), 3d); - Assert.assertEquals(metricEvent.get("15MinRate"), 4d); - Assert.assertEquals(metricEvent.get("mean"), 5d); - - //Timer - Timer value = Mockito.mock(Timer.class); - Mockito.when(value.getCount()).thenReturn(1l); - Mockito.when(value.getOneMinuteRate()).thenReturn(2d); - Mockito.when(value.getFiveMinuteRate()).thenReturn(3d); - Mockito.when(value.getFifteenMinuteRate()).thenReturn(4d); - Mockito.when(value.getMeanRate()).thenReturn(5d); - metricEvent = MetricEvent.of("testTimer").from(value).build(); - - Assert.assertEquals(metricEvent.get("value"), 1l); - Assert.assertEquals(metricEvent.get("1MinRate"), 2d); - Assert.assertEquals(metricEvent.get("5MinRate"), 3d); - Assert.assertEquals(metricEvent.get("15MinRate"), 4d); - Assert.assertEquals(metricEvent.get("mean"), 5d); - - } - - @Test - public void testMerticSystemWithKafkaSink() throws IOException { - - JVMMetricSource jvmMetricSource = mockMetricRegistry(); - //setup kafka - KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(); - makeSureTopic(kafkaEmbedded.getZkConnectionString()); - //setup metric system - File file = genKafkaSinkConfig(kafkaEmbedded.getBrokerConnectionString()); - Config config = ConfigFactory.parseFile(file); - MetricSystem system = MetricSystem.load(config); - system.register(jvmMetricSource); - system.start(); - system.report(); - - SimpleConsumer consumer = assertMsgFromKafka(kafkaEmbedded); - system.stop(); - consumer.close(); - kafkaEmbedded.shutdown(); - } - - @Test - public void testConsoleSink() throws IOException { - PrintStream console = System.out; - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - System.setOut(new PrintStream(bytes)); - - ConsoleSink sink = new ConsoleSink(); - MetricRegistry registry = new MetricRegistry(); - JvmAttributeGaugeSet jvm = Mockito.mock(JvmAttributeGaugeSet.class); - Map<String, Metric> metrics = new HashMap<>(); - metrics.put("name", (Gauge) () -> "testname"); - metrics.put("uptime", (Gauge) () -> "testuptime"); - metrics.put("vendor", (Gauge) () -> "testvendor"); - Mockito.when(jvm.getMetrics()).thenReturn(metrics); - registry.registerAll(jvm); - File file = genConsoleSinkConfig(); - Config config = ConfigFactory.parseFile(file); - sink.prepare(config, registry); - sink.report(); - sink.stop(); - String result = bytes.toString(); - result = result.substring(result.indexOf(END_LINE) + END_LINE.length());//remove first line - Assert.assertEquals("" + END_LINE + "" + - "-- Gauges ----------------------------------------------------------------------" + END_LINE + "" + - "name" + END_LINE + "" + - " value = testname" + END_LINE + "" + - "uptime" + END_LINE + "" + - " value = testuptime" + END_LINE + "" + - "vendor" + END_LINE + "" + - " value = testvendor" + END_LINE + "" + - "" + END_LINE + "" + - "" + END_LINE + "", result); - System.setOut(console); - } - - @Test - public void testSlf4jSink() throws IOException { - PrintStream console = System.out; - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - System.setOut(new PrintStream(bytes)); - - Slf4jSink sink = new Slf4jSink(); - MetricRegistry registry = new MetricRegistry(); - JvmAttributeGaugeSet jvm = Mockito.mock(JvmAttributeGaugeSet.class); - Map<String, Metric> metrics = new HashMap<>(); - metrics.put("name", (Gauge) () -> "testname"); - metrics.put("uptime", (Gauge) () -> "testuptime"); - metrics.put("vendor", (Gauge) () -> "testvendor"); - Mockito.when(jvm.getMetrics()).thenReturn(metrics); - registry.registerAll(jvm); - File file = genSlf4jSinkConfig(); - Config config = ConfigFactory.parseFile(file); - sink.prepare(config, registry); - sink.report(); - sink.stop(); - String result = bytes.toString(); - String finalResult = ""; - Scanner scanner = new Scanner(result); - while (scanner.hasNext()) { - finalResult += scanner.nextLine().substring(DATA_BEGIN_INDEX) + END_LINE; - } - Assert.assertEquals("type=GAUGE, name=name, value=testname" + END_LINE + "" + - "type=GAUGE, name=uptime, value=testuptime" + END_LINE + "" + - "type=GAUGE, name=vendor, value=testvendor" + END_LINE + "", finalResult); - System.setOut(console); - } - - private SimpleConsumer assertMsgFromKafka(KafkaEmbedded kafkaEmbedded) throws IOException { - SimpleConsumer consumer = new SimpleConsumer("localhost", kafkaEmbedded.getPort(), 100000, 64 * 1024, clientName); - long readOffset = getLastOffset(consumer, TOPIC, 0, kafka.api.OffsetRequest.EarliestTime(), clientName); - FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(TOPIC, 0, readOffset, 100000).build(); - FetchResponse fetchResponse = consumer.fetch(req); - Map<Integer, Map<String, String>> resultCollector = new HashMap<>(); - int count = 1; - for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(TOPIC, 0)) { - long currentOffset = messageAndOffset.offset(); - if (currentOffset < readOffset) { - System.out.println("found an old offset: " + currentOffset + " expecting: " + readOffset); - continue; - } - - readOffset = messageAndOffset.nextOffset(); - ByteBuffer payload = messageAndOffset.message().payload(); - - byte[] bytes = new byte[payload.limit()]; - payload.get(bytes); - String message = new String(bytes, "UTF-8"); - Map<String, String> covertedMsg = JsonUtils.mapper.readValue(message, Map.class); - covertedMsg.remove("timestamp"); - resultCollector.put(count, covertedMsg); - count++; - } - Assert.assertEquals("{1={name=heap.committed, value=175636480}, 2={name=heap.init, value=262144000}, 3={name=heap.max, value=3704094720}, 4={name=heap.usage, value=0.01570181876990446}, 5={name=heap.used, value=58491576}, 6={name=name, value=testname}, 7={name=non-heap.committed, value=36405248}, 8={name=non-heap.init, value=2555904}, 9={name=non-heap.max, value=-1}, 10={name=non-heap.usage, value=-3.5588712E7}, 11={name=non-heap.used, value=35596496}, 12={name=pools.Code-Cache.usage, value=0.020214080810546875}, 13={name=pools.Compressed-Class-Space.usage, value=0.0035556256771087646}, 14={name=pools.Metaspace.usage, value=0.9777212526244751}, 15={name=pools.PS-Eden-Space.usage, value=0.03902325058129612}, 16={name=pools.PS-Old-Gen.usage, value=0.001959359247654333}, 17={name=pools.PS-Survivor-Space.usage, value=0.0}, 18={name=total.committed, value=212107264}, 19={name=total.init, value=264699904}, 20={name=total.max, value=3704094719}, 21={name=total.used, value=94644240} , 22={name=uptime, value=testuptime}, 23={name=vendor, value=testvendor}}", resultCollector.toString()); - return consumer; - } - - private JVMMetricSource mockMetricRegistry() { - JvmAttributeGaugeSet jvm = Mockito.mock(JvmAttributeGaugeSet.class); - Map<String, Metric> metrics = new HashMap<>(); - metrics.put("name", (Gauge) () -> "testname"); - metrics.put("uptime", (Gauge) () -> "testuptime"); - metrics.put("vendor", (Gauge) () -> "testvendor"); - Mockito.when(jvm.getMetrics()).thenReturn(metrics); - JVMMetricSource jvmMetricSource = new JVMMetricSource(); - Assert.assertEquals("jvm", jvmMetricSource.name()); - MetricRegistry realRegistry = jvmMetricSource.registry(); - Assert.assertTrue(realRegistry.remove("name")); - Assert.assertTrue(realRegistry.remove("uptime")); - Assert.assertTrue(realRegistry.remove("vendor")); - realRegistry.registerAll(jvm); - - MemoryUsageGaugeSet mem = Mockito.mock(MemoryUsageGaugeSet.class); - Map<String, Metric> memMetrics = new HashMap<>(); - Assert.assertTrue(realRegistry.remove("heap.committed")); - Assert.assertTrue(realRegistry.remove("heap.init")); - Assert.assertTrue(realRegistry.remove("heap.max")); - Assert.assertTrue(realRegistry.remove("heap.usage")); - Assert.assertTrue(realRegistry.remove("heap.used")); - Assert.assertTrue(realRegistry.remove("non-heap.committed")); - Assert.assertTrue(realRegistry.remove("non-heap.init")); - Assert.assertTrue(realRegistry.remove("non-heap.max")); - Assert.assertTrue(realRegistry.remove("non-heap.usage")); - Assert.assertTrue(realRegistry.remove("non-heap.used")); - Assert.assertTrue(realRegistry.remove("pools.Code-Cache.usage")); - Assert.assertTrue(realRegistry.remove("pools.Compressed-Class-Space.usage")); - Assert.assertTrue(realRegistry.remove("pools.Metaspace.usage")); - Assert.assertTrue(realRegistry.remove("pools.PS-Eden-Space.usage")); - Assert.assertTrue(realRegistry.remove("pools.PS-Old-Gen.usage")); - Assert.assertTrue(realRegistry.remove("pools.PS-Survivor-Space.usage")); - Assert.assertTrue(realRegistry.remove("total.committed")); - Assert.assertTrue(realRegistry.remove("total.init")); - Assert.assertTrue(realRegistry.remove("total.max")); - memMetrics.put("heap.committed", (Gauge) () -> 175636480); - memMetrics.put("heap.init", (Gauge) () -> 262144000); - memMetrics.put("heap.max", (Gauge) () -> 3704094720l); - memMetrics.put("heap.usage", (Gauge) () -> 0.01570181876990446); - memMetrics.put("heap.used", (Gauge) () -> 58491576); - memMetrics.put("non-heap.committed", (Gauge) () -> 36405248); - memMetrics.put("non-heap.init", (Gauge) () -> 2555904); - memMetrics.put("non-heap.max", (Gauge) () -> -1); - memMetrics.put("non-heap.usage", (Gauge) () -> -3.5588712E7); - memMetrics.put("non-heap.used", (Gauge) () -> 35596496); - memMetrics.put("pools.Code-Cache.usage", (Gauge) () -> 0.020214080810546875); - memMetrics.put("pools.Compressed-Class-Space.usage", (Gauge) () -> 0.0035556256771087646); - memMetrics.put("pools.Metaspace.usage", (Gauge) () -> 0.9777212526244751); - memMetrics.put("pools.PS-Eden-Space.usage", (Gauge) () -> 0.03902325058129612); - memMetrics.put("pools.PS-Old-Gen.usage", (Gauge) () -> 0.001959359247654333); - memMetrics.put("pools.PS-Survivor-Space.usage", (Gauge) () -> 0.0); - memMetrics.put("total.committed", (Gauge) () -> 212107264); - memMetrics.put("total.init", (Gauge) () -> 264699904); - memMetrics.put("total.max", (Gauge) () -> 3704094719l); - memMetrics.put("total.used", (Gauge) () -> 94644240); - Mockito.when(mem.getMetrics()).thenReturn(memMetrics); - Assert.assertTrue(realRegistry.remove("total.used")); - realRegistry.registerAll(mem); - return jvmMetricSource; - } - - private long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { - TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); - Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); - OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); - OffsetResponse response = consumer.getOffsetsBefore(request); - if (response.hasError()) { - System.out.println("error fetching data offset data the broker. reason: " + response.errorCode(topic, partition)); - return 0; - } - long[] offsets = response.offsets(topic, partition); - return offsets[0]; - } - - private File genKafkaSinkConfig(String brokerConnectionString) throws IOException { - File file = tempFolder.newFile("application.conf"); - String fileContent = "{" + END_LINE + "" + - " metric {" + END_LINE + "" + - " sink {" + END_LINE + "" + - " kafka {" + END_LINE + "" + - " \"topic\": \"" + TOPIC + "\"" + END_LINE + "" + - " \"bootstrap.servers\": \"" + brokerConnectionString + "\"" + END_LINE + "" + - " }" + END_LINE + "" + - " }" + END_LINE + "" + - " }" + END_LINE + "" + - "}"; - FileUtils.writeStringToFile(file, fileContent); - return file; - } - - private File genConsoleSinkConfig() throws IOException { - File file = tempFolder.newFile("application-console.conf"); - String fileContent = "{" + END_LINE + "" + - " metric {" + END_LINE + "" + - " sink {" + END_LINE + "" + - " stdout {" + END_LINE + "" + - " // console metric sink" + END_LINE + "" + - " }" + END_LINE + "" + - " }" + END_LINE + "" + - " }" + END_LINE + "" + - "}"; - FileUtils.writeStringToFile(file, fileContent); - return file; - } - - private File genSlf4jSinkConfig() throws IOException { - File file = tempFolder.newFile("application-slf4j.conf"); - String fileContent = "{" + END_LINE + "" + - " metric {" + END_LINE + "" + - " sink {" + END_LINE + "" + - " logger {" + END_LINE + "" + - " level = \"INFO\"" + END_LINE + "" + - " }" + END_LINE + "" + - " }" + END_LINE + "" + - " }" + END_LINE + "" + - " }"; - FileUtils.writeStringToFile(file, fileContent); - return file; - } - - public void makeSureTopic(String zkConnectionString) { - ZkClient zkClient = new ZkClient(zkConnectionString, 10000, 10000, ZKStringSerializer$.MODULE$); - Properties topicConfiguration = new Properties(); - AdminUtils.createTopic(zkClient, TOPIC, 1, 1, topicConfiguration); - } - - - @Test - @Ignore - public void testMetaConflict() { - MetricSystem system = MetricSystem.load(ConfigFactory.load()); - system.register(new MetaConflictMetricSource()); - system.start(); - system.report(); - system.stop(); - } - - private class MetaConflictMetricSource implements MetricSource { - private MetricRegistry registry = new MetricRegistry(); - - public MetaConflictMetricSource() { - registry.register("meta.conflict", (Gauge<String>) () -> "meta conflict happening!"); - } - - @Override - public String name() { - return "metaConflict"; - } - - @Override - public MetricRegistry registry() { - return registry; - } - } - - private class SampleMetricSource implements MetricSource { - private MetricRegistry registry = new MetricRegistry(); - - public SampleMetricSource() { - registry.register("sample.long", (Gauge<Long>) System::currentTimeMillis); - registry.register("sample.map", (Gauge<Map<String, Object>>) () -> new HashMap<String, Object>() { - private static final long serialVersionUID = 3948508906655117683L; - - { - put("int", 1234); - put("str", "text"); - put("bool", true); - } - }); - } - - @Override - public String name() { - return "sampleSource"; - } - - @Override - public MetricRegistry registry() { - return registry; - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java deleted file mode 100644 index 50b00d9..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.service; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.List; - -public class TestMetadataServiceClientImpl { - @SuppressWarnings("resource") - @Ignore - @Test - public void test() throws Exception { - MetadataServiceClientImpl impl = new MetadataServiceClientImpl("localhost", 58080, "/api/metadata/policies"); - List<PolicyDefinition> policies = impl.listPolicies(); - ObjectMapper mapper = new ObjectMapper(); - String ret = mapper.writeValueAsString(policies); - System.out.println(ret); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/ConfigUtilsTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/ConfigUtilsTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/ConfigUtilsTest.java deleted file mode 100644 index a97d09b..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/ConfigUtilsTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.util; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.commons.io.FileUtils; -import org.apache.eagle.alert.utils.ConfigUtils; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.io.IOException; -import java.util.Properties; - -public class ConfigUtilsTest { - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - public static final String END_LINE = System.getProperty("line.separator"); - - @Test - public void testToProperties() throws IOException { - Config config = ConfigFactory.parseFile(genConfig()); - Properties properties = ConfigUtils.toProperties(config); - System.out.print(properties); - Assert.assertEquals("{metric={sink={stdout={}, elasticsearch={hosts=[localhost:9200], index=alert_metric_test}, kafka={topic=alert_metric_test, bootstrap.servers=localhost:9092}, logger={level=INFO}}}, zkConfig={zkQuorum=localhost:2181, zkRoot=/alert}}", properties.toString()); - } - - private File genConfig() throws IOException { - File file = tempFolder.newFile("application-config.conf"); - String fileContent = "{" + END_LINE + "" + - " metric {" + END_LINE + "" + - " sink {" + END_LINE + "" + - " stdout {" + END_LINE + "" + - " // console metric sink" + END_LINE + "" + - " }" + END_LINE + "" + - " kafka {" + END_LINE + "" + - " \"topic\": \"alert_metric_test\"" + END_LINE + "" + - " \"bootstrap.servers\": \"localhost:9092\"" + END_LINE + "" + - " }" + END_LINE + "" + - " logger {" + END_LINE + "" + - " level = \"INFO\"" + END_LINE + "" + - " }" + END_LINE + "" + - " elasticsearch {" + END_LINE + "" + - " hosts = [\"localhost:9200\"]" + END_LINE + "" + - " index = \"alert_metric_test\"" + END_LINE + "" + - " }" + END_LINE + "" + - " }" + END_LINE + "" + - " }" + END_LINE + "" + - " zkConfig {" + END_LINE + "" + - " \"zkQuorum\": \"localhost:2181\"" + END_LINE + "" + - " \"zkRoot\": \"/alert\"" + END_LINE + "" + - " }" + END_LINE + "" + - "}"; - FileUtils.writeStringToFile(file, fileContent); - return file; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/JsonUtilsTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/JsonUtilsTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/JsonUtilsTest.java deleted file mode 100644 index 6d6244a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/JsonUtilsTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.util; - -import org.apache.eagle.alert.utils.JsonUtils; -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -public class JsonUtilsTest { - - @Test - public void testWriteValueAsString() { - - Map<String, Object> jsonMap = new HashMap<>(); - jsonMap.put("policyId", "policyId"); - jsonMap.put("streamId", "streamId"); - jsonMap.put("createBy", "createBy"); - jsonMap.put("createTime", "createTime"); - Assert.assertEquals("{\"createBy\":\"createBy\",\"policyId\":\"policyId\",\"streamId\":\"streamId\",\"createTime\":\"createTime\"}", JsonUtils.writeValueAsString(jsonMap)); - - jsonMap = new HashMap<>(); - Assert.assertEquals("{}", JsonUtils.writeValueAsString(jsonMap)); - - Assert.assertEquals("null", JsonUtils.writeValueAsString(null)); - - Assert.assertEquals("", JsonUtils.writeValueAsString(new Object())); - } -} - http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/StreamIdConversionTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/StreamIdConversionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/StreamIdConversionTest.java deleted file mode 100644 index 2ea8915..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/StreamIdConversionTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.util; - -import org.apache.eagle.alert.utils.StreamIdConversion; -import org.junit.Assert; -import org.junit.Test; - -public class StreamIdConversionTest { - @Test - public void testGenerateStreamIdBetween() { - String result = StreamIdConversion.generateStreamIdBetween("source1", "target1"); - Assert.assertEquals("stream_source1_to_target1", result); - result = StreamIdConversion.generateStreamIdBetween("", "target1"); - Assert.assertEquals("stream__to_target1", result); - result = StreamIdConversion.generateStreamIdBetween("source1", null); - Assert.assertEquals("stream_source1_to_null", result); - } - - @Test - public void testGenerateStreamIdByPartition() { - String result = StreamIdConversion.generateStreamIdByPartition(1); - Assert.assertEquals("stream_1", result); - result = StreamIdConversion.generateStreamIdByPartition(-1); - Assert.assertEquals("stream_-1", result); - result = StreamIdConversion.generateStreamIdByPartition(0); - Assert.assertEquals("stream_0", result); - - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/TimePeriodUtilsTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/TimePeriodUtilsTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/TimePeriodUtilsTest.java deleted file mode 100644 index 878f162..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/TimePeriodUtilsTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.util; - -import org.apache.eagle.alert.utils.TimePeriodUtils; -import org.apache.eagle.common.DateTimeUtil; -import org.joda.time.Period; -import org.joda.time.Seconds; -import org.junit.Assert; -import org.junit.Test; - -import java.text.ParseException; - -public class TimePeriodUtilsTest { - @Test - public void testJodaTimePeriod() throws ParseException { - String periodText = "PT10m"; - Period period = new Period(periodText); - int seconds = period.toStandardSeconds().getSeconds(); - Assert.assertEquals(600, seconds); - Assert.assertEquals(60, period.toStandardSeconds().dividedBy(10).getSeconds()); - } - - @Test - public void testJodaTimePeriodBySeconds() throws ParseException { - String periodText = "PT10s"; - Period period = new Period(periodText); - int seconds = period.toStandardSeconds().getSeconds(); - Assert.assertEquals(10, seconds); - } - - @Test - public void testFormatSecondsByPeriod15M() throws ParseException { - - Period period = new Period("PT15m"); - Seconds seconds = period.toStandardSeconds(); - Assert.assertEquals(15 * 60, seconds.getSeconds()); - - long time = DateTimeUtil.humanDateToSeconds("2015-07-01 13:56:12"); - long expect = DateTimeUtil.humanDateToSeconds("2015-07-01 13:45:00"); - long result = TimePeriodUtils.formatSecondsByPeriod(time, seconds); - Assert.assertEquals(expect, result); - - time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59"); - expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00"); - result = TimePeriodUtils.formatSecondsByPeriod(time, seconds); - Assert.assertEquals(expect, result); - - time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59"); - expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00"); - result = TimePeriodUtils.formatSecondsByPeriod(time, seconds); - Assert.assertEquals(expect, result); - } - - @Test - public void testFormatSecondsByPeriod1H() throws ParseException { - - Period period = new Period("PT1h"); - Seconds seconds = period.toStandardSeconds(); - Assert.assertEquals(60 * 60, seconds.getSeconds()); - - long time = DateTimeUtil.humanDateToSeconds("2015-07-01 13:56:12"); - long expect = DateTimeUtil.humanDateToSeconds("2015-07-01 13:00:00"); - long result = TimePeriodUtils.formatSecondsByPeriod(time, seconds); - Assert.assertEquals(expect, result); - - time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59"); - expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00"); - result = TimePeriodUtils.formatSecondsByPeriod(time, seconds); - Assert.assertEquals(expect, result); - - time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:30:59"); - expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00"); - result = TimePeriodUtils.formatSecondsByPeriod(time, seconds); - Assert.assertEquals(expect, result); - } - - - @Test - public void testPeriod() { - Assert.assertEquals(30 * 60 * 1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.parse("PT30m"))); - Assert.assertEquals(30 * 60 * 1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.millis(30 * 60 * 1000))); - Assert.assertEquals("PT1800S", Period.millis(30 * 60 * 1000).toString()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/JsonTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/JsonTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/JsonTest.java deleted file mode 100644 index ff0a3f9..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/JsonTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.correlation.meta; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.junit.Test; - -/** - * Created on 3/11/16. - */ -public class JsonTest { - - @Test - public void streamDefTest() throws Exception { - - ObjectMapper mapper = new ObjectMapper(); - mapper.readValue(JsonTest.class.getResourceAsStream("/streamDef.json"), StreamDefinition.class); - - com.fasterxml.jackson.databind.ObjectMapper mapper2 = new com.fasterxml.jackson.databind.ObjectMapper(); - mapper2.readValue(JsonTest.class.getResourceAsStream("/streamDef.json"), StreamDefinition.class); - - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/StreamPartitionTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/StreamPartitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/StreamPartitionTest.java deleted file mode 100644 index 524f76a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/StreamPartitionTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.correlation.meta; - -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; - -public class StreamPartitionTest { - @Test - public void testStreamPartitionEqual(){ - StreamPartition partition1 = new StreamPartition(); - partition1.setStreamId("unittest"); - partition1.setColumns(Arrays.asList("col1","col2")); - StreamPartition partition2 = new StreamPartition(); - partition2.setStreamId("unittest"); - partition2.setColumns(Arrays.asList("col1","col2")); - Assert.assertTrue(partition1.equals(partition2)); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringListSizeFunctionExtensionTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringListSizeFunctionExtensionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringListSizeFunctionExtensionTest.java deleted file mode 100644 index 6cb3696..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringListSizeFunctionExtensionTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.siddhiext; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.ExecutionPlanRuntime; -import org.wso2.siddhi.core.SiddhiManager; -import org.wso2.siddhi.core.event.Event; -import org.wso2.siddhi.core.stream.input.InputHandler; -import org.wso2.siddhi.core.stream.output.StreamCallback; -import org.wso2.siddhi.core.util.EventPrinter; - -import java.util.concurrent.Semaphore; - -public class StringListSizeFunctionExtensionTest { - private static final Logger LOG = LoggerFactory.getLogger(StringSubtractFunctionExtensionTest.class); - - @Test - public void testStringListSize() throws Exception { - Semaphore semp = new Semaphore(1); - String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " + - " from log select string:listSize(switchLabel) as alertKey insert into output; "; - SiddhiManager manager = new SiddhiManager(); - ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql); - runtime.addCallback("output", new StreamCallback() { - @Override - public void receive(Event[] events) { - EventPrinter.print(events); - Assert.assertTrue(events.length == 1); - Assert.assertTrue(Integer.parseInt(events[0].getData(0).toString()) == 5); - semp.release(); - } - }); - - runtime.start(); - - InputHandler logInput = runtime.getInputHandler("log"); - semp.acquire(); - Event e = new Event(); - e.setTimestamp(System.currentTimeMillis()); - String ths = "[\"a\", \"b\", \"c\", \"d\", \"e\"]"; - String rhs = "[\"b\", \"d\"]"; - e.setData(new Object[] {System.currentTimeMillis(), ths, "port01", rhs}); - logInput.send(e); - - semp.acquire(); - runtime.shutdown(); - - } - - @Test - public void testStringListSize2() throws Exception { - Semaphore semp = new Semaphore(1); - String ql = " define stream log(timestamp long, site string, component string, resource string, host string, value string); " + - " from a = log[resource == \"hadoop.namenode.namenodeinfo.corruptfiles\"],\n" + - "b = log[component == a.component and resource == a.resource and host == a.host and a.value != b.value]\n" + - "select b.site as site, b.host as host, b.component as component, b.resource as resource, " + - "b.timestamp as timestamp, string:listSize(b.value) as newMissingBlocksNumber, string:listSize(a.value) as oldMissingBlocksNumber, string:subtract(b.value, a.value) as missingBlocks\n" + - "insert into output;"; - SiddhiManager manager = new SiddhiManager(); - ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql); - runtime.addCallback("output", new StreamCallback() { - @Override - public void receive(Event[] events) { - EventPrinter.print(events); - Assert.assertTrue(events.length == 1); - Assert.assertTrue(Integer.parseInt(events[0].getData(5).toString()) == 5); - Assert.assertTrue(Integer.parseInt(events[0].getData(6).toString()) == 2); - Assert.assertTrue(events[0].getData(7).toString().equals("a\nc\ne")); - semp.release(); - } - }); - - runtime.start(); - - InputHandler logInput = runtime.getInputHandler("log"); - semp.acquire(); - Event e = new Event(); - e.setTimestamp(System.currentTimeMillis()); - String rhs = "[\"b\", \"d\"]"; - e.setData(new Object[] {System.currentTimeMillis(), "a", "a", "hadoop.namenode.namenodeinfo.corruptfiles", "port01", rhs}); - logInput.send(e); - - e.setTimestamp(System.currentTimeMillis()); - String ths = "[\"a\", \"b\", \"c\", \"d\", \"e\"]"; - e.setData(new Object[] {System.currentTimeMillis(), "a", "a", "hadoop.namenode.namenodeinfo.corruptfiles", "port01", ths}); - logInput.send(e); - - semp.acquire(); - runtime.shutdown(); - - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java deleted file mode 100644 index 4a31c69..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.siddhiext; - -import org.junit.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.ExecutionPlanRuntime; -import org.wso2.siddhi.core.SiddhiManager; -import org.wso2.siddhi.core.event.Event; -import org.wso2.siddhi.core.stream.input.InputHandler; -import org.wso2.siddhi.core.stream.output.StreamCallback; -import org.wso2.siddhi.core.util.EventPrinter; - -import java.util.concurrent.Semaphore; - -public class StringSubtractFunctionExtensionTest { - private static final Logger LOG = LoggerFactory.getLogger(StringSubtractFunctionExtensionTest.class); - - @Test - public void testStringSubtract() throws Exception { - Semaphore semp = new Semaphore(1); - String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " + - " from log select string:subtract(switchLabel, message) as alertKey insert into output; "; - SiddhiManager manager = new SiddhiManager(); - ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql); - runtime.addCallback("output", new StreamCallback() { - @Override - public void receive(Event[] events) { - EventPrinter.print(events); - Assert.assertTrue(events.length == 1); - Assert.assertTrue(events[0].getData(0).toString().equals("a\nc\ne")); - semp.release(); - } - }); - - runtime.start(); - - InputHandler logInput = runtime.getInputHandler("log"); - semp.acquire(); - Event e = new Event(); - e.setTimestamp(System.currentTimeMillis()); - String ths = "[\"a\", \"b\", \"c\", \"d\", \"e\"]"; - String rhs = "[\"b\", \"d\"]"; - e.setData(new Object[] {System.currentTimeMillis(), ths, "port01", rhs}); - logInput.send(e); - - semp.acquire(); - runtime.shutdown(); - - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf deleted file mode 100644 index b763acf..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf +++ /dev/null @@ -1,39 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -{ - metric { - sink { - stdout { - // console metric sink - } - kafka { - "topic": "alert_metric_test" - "bootstrap.servers": "localhost:9092" - } - logger { - level = "INFO" - } - elasticsearch { - hosts = ["localhost:9200"] - index = "alert_metric_test" - } - } - } - zkConfig { - "zkQuorum": "localhost:2181" - "zkRoot": "/alert" - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties deleted file mode 100644 index ba06033..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties +++ /dev/null @@ -1,19 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -log4j.rootLogger=DEBUG, stdout -# standard output -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json deleted file mode 100644 index 5a78b6a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json +++ /dev/null @@ -1,45 +0,0 @@ -{ - "streamId": "perfmon_cpu_stream", - "dataSource": "perfmon_datasource", - "description": "the data stream for perfmon cpu metrics", - "validate": false, - "timeseries": false, - "columns": [ - { - "name": "host", - "type": "string", - "defaultValue": "", - "required": true - }, - { - "name": "timestamp", - "type": "long", - "defaultValue": 0, - "required": true - }, - { - "name": "floatField", - "type": "float", - "defaultValue": "1.2", - "required": true - }, - { - "name": "intField", - "type": "int", - "defaultValue": "3", - "required": true - }, - { - "name": "value", - "type": "double", - "defaultValue": 0.0, - "required": true - }, - { - "name": "boolField", - "type": "bool", - "defaultValue": true, - "required": true - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext deleted file mode 100644 index 7176611..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -empty=org.apache.eagle.alert.siddhiext.StringEmptyFunctionExtension -subtract=org.apache.eagle.alert.siddhiext.StringSubtractFunctionExtension -listSize=org.apache.eagle.alert.siddhiext.StringListSizeFunctionExtension \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/.gitignore ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/.gitignore b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/.gitignore deleted file mode 100644 index b83d222..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target/ http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml deleted file mode 100644 index 9022843..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml +++ /dev/null @@ -1,113 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with ~ - this work for additional information regarding copyright ownership. ~ The - ASF licenses this file to You under the Apache License, Version 2.0 ~ (the - "License"); you may not use this file except in compliance with ~ the License. - You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ ~ Unless required by applicable law or agreed to in writing, software ~ - distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT - WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the - License for the specific language governing permissions and ~ limitations - under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.eagle</groupId> - <artifactId>eagle-alert</artifactId> - <version>0.5.0-SNAPSHOT</version> - </parent> - - <artifactId>alert-coordinator</artifactId> - <packaging>jar</packaging> - - <name>Eagle::Core::Alert::Coordinator</name> - <url>http://maven.apache.org</url> - - <dependencies> - <dependency> - <groupId>org.apache.eagle</groupId> - <artifactId>alert-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-servlet</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey.contribs</groupId> - <artifactId>jersey-multipart</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <dependency> - <groupId>com.typesafe</groupId> - <artifactId>config</artifactId> - </dependency> - <dependency> - <groupId>org.apache.tomcat.embed</groupId> - <artifactId>tomcat-embed-core</artifactId> - </dependency> - <dependency> - <groupId>io.swagger</groupId> - <artifactId>swagger-jaxrs</artifactId> - </dependency> - <dependency> - <groupId>org.wso2.siddhi</groupId> - <artifactId>siddhi-core</artifactId> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-maven-plugin</artifactId> - <configuration> - <scanIntervalSeconds>5</scanIntervalSeconds> - <httpConnector> - <port>9090</port> - </httpConnector> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <version>2.1.2</version> - <executions> - <execution> - <id>attach-sources</id> - <phase>verify</phase> - <goals> - <goal>jar-no-fork</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java deleted file mode 100644 index cccf2e3..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator; - -import com.google.common.base.Stopwatch; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.config.ConfigBusProducer; -import org.apache.eagle.alert.config.ConfigValue; -import org.apache.eagle.alert.config.ZKConfig; -import org.apache.eagle.alert.config.ZKConfigBuilder; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordinator.impl.MetadataValdiator; -import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder; -import org.apache.eagle.alert.coordinator.trigger.CoordinatorTrigger; -import org.apache.eagle.alert.coordinator.trigger.DynamicPolicyLoader; -import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener; -import org.apache.eagle.alert.coordinator.trigger.ScheduleStateCleaner; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.text.MessageFormat; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -/** - * @since Mar 24, 2016. - */ -public class Coordinator { - - private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); - - private static final String COORDINATOR = "coordinator"; - - /** - * /alert/{topologyName}/spout - * /router - * /alert - * /publisher - * . - */ - private static final String ZK_ALERT_CONFIG_SPOUT = "{0}/spout"; - private static final String ZK_ALERT_CONFIG_ROUTER = "{0}/router"; - private static final String ZK_ALERT_CONFIG_ALERT = "{0}/alert"; - private static final String ZK_ALERT_CONFIG_PUBLISHER = "{0}/publisher"; - - - private static final String METADATA_SERVICE_HOST = "metadataService.host"; - private static final String METADATA_SERVICE_PORT = "metadataService.port"; - private static final String METADATA_SERVICE_CONTEXT = "metadataService.context"; - private static final String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis"; - private static final String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis"; - private static final String DYNAMIC_SCHEDULE_STATE_CLEAR_MIN = "metadataDynamicCheck.stateClearPeriodMin"; - private static final String DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY = "metadataDynamicCheck.stateReservedCapacity"; - - private static final int DEFAULT_STATE_RESERVE_CAPACITY = 1000; - - public static final String GREEDY_SCHEDULER_ZK_PATH = "/alert/greedy/leader"; - - private volatile ScheduleState currentState = null; - private ZKConfig zkConfig = null; - private final IMetadataServiceClient client; - private Config config; - - // FIXME : UGLY global state - private static final AtomicBoolean forcePeriodicallyBuild = new AtomicBoolean(true); - - public Coordinator() { - config = ConfigFactory.load().getConfig(COORDINATOR); - zkConfig = ZKConfigBuilder.getZKConfig(config); - client = new MetadataServiceClientImpl(config); - } - - public Coordinator(Config config, ZKConfig zkConfig, IMetadataServiceClient client) { - this.config = config; - this.zkConfig = zkConfig; - this.client = client; - } - - public synchronized ScheduleState schedule(ScheduleOption option) throws TimeoutException { - ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig); - AtomicReference<ScheduleState> reference = new AtomicReference<>(); - try { - executor.execute(GREEDY_SCHEDULER_ZK_PATH, () -> { - ScheduleState state = null; - Stopwatch watch = Stopwatch.createStarted(); - IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext(); - TopologyMgmtService mgmtService = new TopologyMgmtService(); - IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler(); - - scheduler.init(context, mgmtService); - state = scheduler.schedule(option); - - long scheduleTime = watch.elapsed(TimeUnit.MILLISECONDS); - state.setScheduleTimeMillis((int) scheduleTime);// hardcode to integer - watch.reset(); - watch.start(); - - // persist & notify - try (ConfigBusProducer producer = new ConfigBusProducer(ZKConfigBuilder.getZKConfig(config))) { - postSchedule(client, state, producer); - } - - watch.stop(); - long postTime = watch.elapsed(TimeUnit.MILLISECONDS); - LOG.info("Schedule result, schedule time {} ms, post schedule time {} ms !", scheduleTime, postTime); - reference.set(state); - currentState = state; - }); - } catch (TimeoutException e1) { - LOG.error("time out when schedule", e1); - throw e1; - } finally { - try { - executor.close(); - } catch (IOException e) { - LOG.error("Exception when close exclusive executor, log and ignore!", e); - } - } - return reference.get(); - } - - public static void postSchedule(IMetadataServiceClient client, ScheduleState state, ConfigBusProducer producer) { - // persist state - client.addScheduleState(state); - - // notify - ConfigValue value = new ConfigValue(); - value.setValue(state.getVersion()); - value.setValueVersionId(true); - for (String topo : state.getSpoutSpecs().keySet()) { - producer.send(MessageFormat.format(ZK_ALERT_CONFIG_SPOUT, topo), value); - } - for (String topo : state.getGroupSpecs().keySet()) { - producer.send(MessageFormat.format(ZK_ALERT_CONFIG_ROUTER, topo), value); - } - for (String topo : state.getAlertSpecs().keySet()) { - producer.send(MessageFormat.format(ZK_ALERT_CONFIG_ALERT, topo), value); - } - for (String topo : state.getPublishSpecs().keySet()) { - producer.send(MessageFormat.format(ZK_ALERT_CONFIG_PUBLISHER, topo), value); - } - - } - - public ScheduleState getState() { - return currentState; - } - - public ValidateState validate() { - return new MetadataValdiator(client).validate(); - } - - /** - * shutdown background threads and release various resources. - */ - private static class CoordinatorShutdownHook implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(CoordinatorShutdownHook.class); - private ScheduledExecutorService executorSrv; - - public CoordinatorShutdownHook(ScheduledExecutorService executorSrv) { - this.executorSrv = executorSrv; - } - - @Override - public void run() { - LOG.info("start shutdown coordinator ..."); - LOG.info("Step 1 shutdown dynamic policy loader thread "); - // we should catch every exception to make best effort for clean - // shutdown - try { - executorSrv.shutdown(); - executorSrv.awaitTermination(2000, TimeUnit.MILLISECONDS); - } catch (Throwable t) { - LOG.error("error shutdown dynamic policy loader", t); - } finally { - executorSrv.shutdownNow(); - } - } - } - - private static class PolicyChangeHandler implements PolicyChangeListener { - private static final Logger LOG = LoggerFactory.getLogger(PolicyChangeHandler.class); - private Config config; - private IMetadataServiceClient client; - - public PolicyChangeHandler(Config config, IMetadataServiceClient client) { - this.config = config; - this.client = client; - } - - @Override - public void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies, - Collection<String> removedPolicies, Collection<String> modifiedPolicies) { - LOG.info("policy changed ... "); - LOG.info("allPolicies: " + allPolicies + ", addedPolicies: " + addedPolicies + ", removedPolicies: " - + removedPolicies + ", modifiedPolicies: " + modifiedPolicies); - - CoordinatorTrigger trigger = new CoordinatorTrigger(config, client); - trigger.run(); - - } - } - - public static void startSchedule() { - Config config = ConfigFactory.load().getConfig(COORDINATOR); - String host = config.getString(METADATA_SERVICE_HOST); - int port = config.getInt(METADATA_SERVICE_PORT); - String context = config.getString(METADATA_SERVICE_CONTEXT); - IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context); - - // schedule dynamic policy loader - long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS); - long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS); - ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2, r -> { - Thread t = new Thread(r); - t.setDaemon(true); - return t; - }); - - DynamicPolicyLoader loader = new DynamicPolicyLoader(client); - loader.addPolicyChangeListener(new PolicyChangeHandler(config, client)); - scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS); - - if (config.hasPath(DYNAMIC_SCHEDULE_STATE_CLEAR_MIN) && config.hasPath(DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY)) { - int period = config.getInt(DYNAMIC_SCHEDULE_STATE_CLEAR_MIN); - int capacity = config.getInt(DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY); - ScheduleStateCleaner cleaner = new ScheduleStateCleaner(client, capacity); - scheduleSrv.scheduleAtFixedRate(cleaner, period, period, TimeUnit.MINUTES); - } - - Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv))); - LOG.info("Eagle Coordinator started ..."); - } - - public void enforcePeriodicallyBuild() { - forcePeriodicallyBuild.set(true); - } - - public void disablePeriodicallyBuild() { - forcePeriodicallyBuild.set(false); - } - - public static boolean isPeriodicallyForceBuildEnable() { - return forcePeriodicallyBuild.get(); - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java deleted file mode 100644 index c026785..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator; - -public class CoordinatorConstants { - public static final String CONFIG_ITEM_COORDINATOR = "coordinator"; - public static final String CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND = "topologyLoadUpbound"; - public static final String CONFIG_ITEM_BOLT_LOAD_UPBOUND = "boltLoadUpbound"; - public static final String POLICY_DEFAULT_PARALLELISM = "policyDefaultParallelism"; - public static final String BOLT_PARALLELISM = "boltParallelism"; - public static final String NUM_OF_ALERT_BOLTS_PER_TOPOLOGY = "numOfAlertBoltsPerTopology"; - public static final String POLICIES_PER_BOLT = "policiesPerBolt"; - public static final String REUSE_BOLT_IN_STREAMS = "reuseBoltInStreams"; - public static final String STREAMS_PER_BOLT = "streamsPerBolt"; -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java deleted file mode 100644 index 7ebf26a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import javax.servlet.ServletContextEvent; -import javax.servlet.ServletContextListener; - -/** - * @since Jun 16, 2016. - */ -public class CoordinatorListener implements ServletContextListener { - - private static final Logger LOG = LoggerFactory.getLogger(CoordinatorListener.class); - - public CoordinatorListener() { - } - - @Override - public void contextInitialized(ServletContextEvent sce) { - LOG.info("start coordinator background tasks.."); - Coordinator.startSchedule(); - } - - @Override - public void contextDestroyed(ServletContextEvent sce) { - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java deleted file mode 100644 index 567e1e2..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator; - -import com.google.common.base.Stopwatch; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.recipes.leader.LeaderSelector; -import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; -import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.retry.RetryNTimes; -import org.apache.curator.utils.CloseableUtils; -import org.apache.eagle.alert.config.ZKConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -public class ExclusiveExecutor implements Closeable { - - private static final Logger LOG = LoggerFactory.getLogger(ExclusiveExecutor.class); - - private static final int ZK_RETRYPOLICY_SLEEP_TIME_MS = 1000; - private static final int ZK_RETRYPOLICY_MAX_RETRIES = 3; - - public static final int ACQUIRE_LOCK_WAIT_INTERVAL_MS = 3000; - public static final int ACQUIRE_LOCK_MAX_RETRIES_TIMES = 100; //about 5 minutes - - private CuratorFramework client; - private LeaderSelector selector; - - public ExclusiveExecutor(ZKConfig zkConfig ) { - client = CuratorFrameworkFactory.newClient( - zkConfig.zkQuorum, - zkConfig.zkSessionTimeoutMs, - zkConfig.connectionTimeoutMs, - new RetryNTimes(ZK_RETRYPOLICY_MAX_RETRIES, ZK_RETRYPOLICY_SLEEP_TIME_MS) - ); - client.start(); - } - - public void execute(String path, final Runnable r) throws TimeoutException { - execute(path, r, ACQUIRE_LOCK_MAX_RETRIES_TIMES * ACQUIRE_LOCK_WAIT_INTERVAL_MS); - } - - public void execute(String path, final Runnable r, int timeoutMillis) throws TimeoutException { - final AtomicBoolean executed = new AtomicBoolean(false); - Stopwatch watch = Stopwatch.createUnstarted(); - watch.start(); - LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() { - - @Override - public void takeLeadership(CuratorFramework client) throws Exception { - // this callback will get called when you are the leader - // do whatever leader work you need to and only exit - // this method when you want to relinquish leadership - LOG.info("this is leader node right now.."); - executed.set(true); - try { - r.run(); - } catch (Throwable t) { - LOG.warn("failed to run exclusive executor", t); - } - LOG.info("leader node executed done!.."); - } - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { - LOG.info(String.format("leader selector state change listener, new state: %s", newState.toString())); - } - - }; - - selector = new LeaderSelector(client, path, listener); - // selector.autoRequeue(); // not required, but this is behavior that you - // will probably expect - selector.start(); - - // wait for given times - while (watch.elapsed(TimeUnit.MILLISECONDS) < timeoutMillis) { //about 3 minutes waiting - if (!executed.get()) { - try { - Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS); - } catch (InterruptedException e) { - // ignored - } - continue; - } else { - break; - } - } - watch.stop(); - - if (!executed.get()) { - throw new TimeoutException(String.format("Get exclusive lock for operation on path %s failed due to wait too much time: %d ms", - path, watch.elapsed(TimeUnit.MILLISECONDS))); - } - LOG.info("Exclusive operation done with execution time (lock plus operation) {} ms !", watch.elapsed(TimeUnit.MILLISECONDS)); - } - - @Override - public void close() throws IOException { - if (selector != null) { - CloseableUtils.closeQuietly(this.selector); - } - if (client != null) { - CloseableUtils.closeQuietly(this.client); - } - } - -}
