http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java deleted file mode 100644 index 51dad41..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.app.environment.builder; - -import com.google.common.base.Preconditions; -import org.apache.eagle.app.utils.Clock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -public class CounterToRateFunction implements TransformFunction { - private static final Logger LOG = LoggerFactory.getLogger(CounterToRateFunction.class); - private final Map<String, CounterValue> cache; - private MetricDescriptor metricDescriptor; - private Collector collector; - - public CounterToRateFunction(MetricDescriptor metricDescriptor, long heartbeat, TimeUnit unit, final Clock clock) { - final long heartbeatMillis = TimeUnit.MILLISECONDS.convert(heartbeat, unit); - this.cache = new LinkedHashMap<String, CounterValue>(16, 0.75f, true) { - protected boolean removeEldestEntry(Map.Entry<String, CounterValue> eldest) { - final long now = clock.now(); - final long lastMod = eldest.getValue().getTimestamp(); - final boolean expired = (now - lastMod) > heartbeatMillis; - if (expired) { - LOG.debug("heartbeat interval exceeded, expiring {}", eldest.getKey()); - } - return expired; - } - }; - this.metricDescriptor = metricDescriptor; - } - - @Override - public String getName() { - return "CounterToRate"; - } - - @Override - public void open(Collector collector) { - this.collector = collector; - } - - @Override - public void transform(Map event) { - Metric metric = toMetric(event); - LOG.debug("received {} metrics", metric); - if (new DefaultCountMetricFilter().apply(metric.getMetricName())) { - final String metricName = metric.getMetricName(); - final CounterValue prev = cache.get(metricName); - if (prev != null) { - final double rate = prev.computeRate(metric); - event.put(metricDescriptor.getValueField(), rate); - collector.collect(event.toString(), event); - } else { - CounterValue current = new CounterValue(metric); - cache.put(metricName, current); - } - } else { - collector.collect(event.toString(), event); - } - - } - - @Override - public void close() { - cache.clear(); - } - - private Metric toMetric(Map event) { - - String metricName = ""; - for (String dimensionField : metricDescriptor.getDimensionFields()) { - metricName += event.get(dimensionField) + "-"; - } - metricName += metricDescriptor.getMetricNameSelector().getMetricName(event); - - long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event); - - return new Metric(metricName, timestamp, getCurrentValue(event)); - } - - private double getCurrentValue(Map event) { - double[] values; - if (event.containsKey(metricDescriptor.getValueField())) { - values = new double[] {(double) event.get(metricDescriptor.getValueField())}; - } else { - LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event); - values = new double[] {0}; - } - return values[0]; - } - - protected static class CounterValue { - private long timestamp; - private double value; - - public CounterValue(long timestamp, double value) { - this.timestamp = timestamp; - this.value = value; - } - - public CounterValue(Metric m) { - this(m.getTimestamp(), m.getNumberValue().doubleValue()); - } - - public long getTimestamp() { - return timestamp; - } - - public double computeRate(Metric m) { - final long currentTimestamp = m.getTimestamp(); - final double currentValue = m.getNumberValue().doubleValue(); - - final long durationMillis = currentTimestamp - timestamp; - final double delta = currentValue - value; - - timestamp = currentTimestamp; - value = currentValue; - - return computeRate(durationMillis, delta); - } - - private double computeRate(long durationMillis, double delta) { - final double millisPerSecond = 1000.0; - final double duration = durationMillis / millisPerSecond; - return (duration <= 0.0 || delta <= 0.0) ? 0.0 : delta / duration; - } - - @Override - public String toString() { - return "CounterValue{" + "timestamp=" + timestamp + ", value=" + value + '}'; - } - } - - - protected final class Metric { - private final String metricName; - private final long timestamp; - private final Object value; - - public Metric(String metricName, long timestamp, Object value) { - this.metricName = Preconditions.checkNotNull(metricName, "metricName"); - this.timestamp = timestamp; - this.value = Preconditions.checkNotNull(value, "value"); - } - - public String getMetricName() { - return metricName; - } - - public long getTimestamp() { - return timestamp; - } - - public Object getValue() { - return value; - } - - public Number getNumberValue() { - return (Number) value; - } - - public boolean hasNumberValue() { - return (value instanceof Number); - } - - public boolean isCounter() { - return metricName.endsWith("count"); - } - - @Override - public boolean equals(Object obj) { - if (obj == null || !(obj instanceof Metric)) { - return false; - } - Metric m = (Metric) obj; - return metricName.equals(m.getMetricName()) - && timestamp == m.getTimestamp() - && value.equals(m.getValue()); - } - - @Override - public int hashCode() { - int result = metricName.hashCode(); - result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); - result = 31 * result + value.hashCode(); - return result; - } - - @Override - public String toString() { - return "Metric{metricName=" + metricName + ", timestamp=" + timestamp + ", value=" + value + '}'; - } - } - - private class DefaultCountMetricFilter implements CountMetricFilter { - @Override - public Boolean apply(String metricName) { - return metricName.endsWith("count"); - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java deleted file mode 100644 index 04c5bf9..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.builder; - -import java.util.Map; - -public class MaxFunction extends AggregateFunction { - @Override - public String getName() { - return "MAX"; - } - - @Override - public void open(Collector collector) { - throw new IllegalStateException("TODO: Not implemented yet."); - } - - @Override - public void transform(Map event) { - throw new IllegalStateException("TODO: Not implemented yet."); - } - - @Override - public void close() { - - throw new IllegalStateException("TODO: Not implemented yet."); - } - - public static MaxFunction maxOf(String aggFieldName) { - MaxFunction function = new MaxFunction(); - function.setAggFieldName(aggFieldName); - return function; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java deleted file mode 100644 index c33a92d..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.builder; - - -import java.io.Serializable; -import java.util.Arrays; -import java.util.Calendar; -import java.util.List; -import java.util.Map; - -public class MetricDescriptor implements Serializable { - - /** - * Support simple and complex name format, by default using "metric" field. - */ - private MetricNameSelector metricNameSelector = new FieldMetricNameSelector("metric"); - private MetricGroupSelector metricGroupSelector = new FixedMetricGroupSelector(DEFAULT_METRIC_GROUP_NAME); - private SiteIdSelector siteIdSelector = new FieldSiteIdSelector("site"); - - private static final String DEFAULT_METRIC_GROUP_NAME = "Default"; - - public MetricNameSelector getMetricNameSelector() { - return metricNameSelector; - } - - public void setMetricNameSelector(MetricNameSelector metricNameSelector) { - this.metricNameSelector = metricNameSelector; - } - - public MetricGroupSelector getMetricGroupSelector() { - return metricGroupSelector; - } - - public void setMetricGroupSelector(MetricGroupSelector metricGroupSelector) { - this.metricGroupSelector = metricGroupSelector; - } - - /** - * Support event/system time, by default using system time. - */ - private TimestampSelector timestampSelector = new SystemTimestampSelector(); - - /** - * Metric dimension field name. - */ - private List<String> dimensionFields; - - /** - * Metric granularity. - */ - private int granularity = Calendar.MINUTE; - - private String valueField = "value"; - private String resourceField = "resource"; - - public String getResourceField() { - return resourceField; - } - - public void setResourceField(String resourceField) { - this.resourceField = resourceField; - } - - public String getValueField() { - return valueField; - } - - public void setValueField(String valueField) { - this.valueField = valueField; - } - - public List<String> getDimensionFields() { - return dimensionFields; - } - - public void setDimensionFields(List<String> dimensionFields) { - this.dimensionFields = dimensionFields; - } - - public TimestampSelector getTimestampSelector() { - return timestampSelector; - } - - public void setTimestampSelector(TimestampSelector timestampSelector) { - this.timestampSelector = timestampSelector; - } - - public int getGranularity() { - return granularity; - } - - public void setGranularity(int granularity) { - this.granularity = granularity; - } - - public SiteIdSelector getSiteIdSelector() { - return siteIdSelector; - } - - public void setSiteIdSelector(SiteIdSelector siteIdSelector) { - this.siteIdSelector = siteIdSelector; - } - - - @FunctionalInterface - public interface MetricNameSelector extends Serializable { - String getMetricName(Map event); - } - - @FunctionalInterface - public interface MetricGroupSelector extends Serializable { - String getMetricGroup(Map event); - } - - public static class FixedMetricGroupSelector implements MetricGroupSelector { - private final String groupName; - - private FixedMetricGroupSelector(String groupName) { - this.groupName = groupName; - } - - @Override - public String getMetricGroup(Map event) { - return groupName; - } - } - - @FunctionalInterface - public interface TimestampSelector extends Serializable { - Long getTimestamp(Map event); - } - - @FunctionalInterface - public interface SiteIdSelector extends Serializable { - String getSiteId(Map event); - } - - public class FixedSiteIdSelector implements SiteIdSelector { - private final String siteId; - - private FixedSiteIdSelector(String siteId) { - this.siteId = siteId; - } - - @Override - public String getSiteId(Map event) { - return this.siteId; - } - } - - private class FieldSiteIdSelector implements SiteIdSelector { - private final String siteIdFieldName; - - public FieldSiteIdSelector(String siteIdFieldName) { - this.siteIdFieldName = siteIdFieldName; - } - - @Override - public String getSiteId(Map event) { - return (String) event.getOrDefault(this.siteIdFieldName, "UNKNOWN"); - } - } - - public MetricDescriptor namedBy(MetricNameSelector metricNameSelector) { - this.setMetricNameSelector(metricNameSelector); - return this; - } - - public MetricDescriptor siteAs(SiteIdSelector siteIdSelector) { - this.setSiteIdSelector(siteIdSelector); - return this; - } - - public MetricDescriptor siteAs(String siteId) { - this.setSiteIdSelector(new FixedSiteIdSelector(siteId)); - return this; - } - - public MetricDescriptor siteByField(String fieldName) { - this.setMetricNameSelector(new FieldMetricNameSelector(fieldName)); - return this; - } - - /** - * @see java.util.Calendar - */ - public MetricDescriptor granularity(int granularity) { - this.setGranularity(granularity); - return this; - } - - public MetricDescriptor namedByField(String nameField) { - this.setMetricNameSelector(new FieldMetricNameSelector(nameField)); - return this; - } - - public static MetricDescriptor metricGroupAs(String metricGroupName) { - return metricGroupAs(new FixedMetricGroupSelector(metricGroupName)); - } - - public static MetricDescriptor metricGroupAs(MetricGroupSelector groupSelector) { - MetricDescriptor metricDescriptor = new MetricDescriptor(); - metricDescriptor.setMetricGroupSelector(groupSelector); - return metricDescriptor; - } - - public static MetricDescriptor metricGroupByField(String fieldName, String defaultGroupName) { - MetricDescriptor metricDescriptor = new MetricDescriptor(); - metricDescriptor.setMetricGroupSelector((MetricGroupSelector) event -> { - if (event.containsKey(fieldName)) { - return (String) event.get(fieldName); - } else { - return defaultGroupName; - } - }); - return metricDescriptor; - } - - public static MetricDescriptor metricGroupByField(String fieldName) { - return metricGroupByField(fieldName, DEFAULT_METRIC_GROUP_NAME); - } - - public MetricDescriptor eventTimeByField(String timestampField) { - this.setTimestampSelector(new EventTimestampSelector(timestampField)); - return this; - } - - public MetricDescriptor dimensionFields(String... dimensionFields) { - this.setDimensionFields(Arrays.asList(dimensionFields)); - return this; - } - - public MetricDescriptor valueField(String valueField) { - this.setValueField(valueField); - return this; - } - - public class EventTimestampSelector implements TimestampSelector { - private final String timestampField; - - EventTimestampSelector(String timestampField) { - this.timestampField = timestampField; - } - - @Override - public Long getTimestamp(Map event) { - if (event.containsKey(timestampField)) { - Object timestampValue = event.get(timestampField); - if (timestampValue instanceof Integer) { - return Long.valueOf((Integer) timestampValue); - } - if (timestampValue instanceof String) { - return Long.valueOf((String) timestampValue); - } else { - return (Long) timestampValue; - } - } else { - throw new IllegalArgumentException("Timestamp field '" + timestampField + "' not exists"); - } - } - } - - public static class SystemTimestampSelector implements TimestampSelector { - @Override - public Long getTimestamp(Map event) { - return System.currentTimeMillis(); - } - } - - public static class FieldMetricNameSelector implements MetricNameSelector { - private final String fieldName; - - FieldMetricNameSelector(String fieldName) { - this.fieldName = fieldName; - } - - @Override - public String getMetricName(Map event) { - if (event.containsKey(fieldName)) { - return (String) event.get(fieldName); - } else { - throw new IllegalArgumentException("Metric name field '" + fieldName + "' not exists: " + event); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java deleted file mode 100644 index 9135cc8..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.builder; - -import backtype.storm.task.OutputCollector; - -import java.util.Arrays; - -import java.util.Map; - -public class StormOutputCollector implements Collector { - private final OutputCollector delegate; - - StormOutputCollector(OutputCollector delegate) { - this.delegate = delegate; - } - - @Override - public void collect(Object key, Map event) { - delegate.emit(Arrays.asList(key, event)); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java deleted file mode 100644 index 11974ff..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.builder; - -import java.io.Serializable; -import java.util.Map; - -public interface TransformFunction extends Serializable { - String getName(); - - void open(Collector collector); - - void transform(Map event); - - void close(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java deleted file mode 100644 index dbc7239..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.builder; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import org.apache.eagle.app.utils.StreamConvertHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class TransformFunctionBolt extends BaseRichBolt { - private static final Logger LOG = LoggerFactory.getLogger(TransformFunctionBolt.class); - private final TransformFunction function; - private OutputCollector collector; - - public TransformFunctionBolt(TransformFunction function) { - this.function = function; - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.function.open(new StormOutputCollector(collector)); - this.collector = collector; - } - - @Override - public void execute(Tuple input) { - try { - this.function.transform(StreamConvertHelper.tupleToEvent(input).f1()); - this.collector.ack(input); - } catch (Throwable throwable) { - LOG.error("Transform error: {}", input, throwable); - this.collector.reportError(throwable); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("f1","f2")); - } - - @Override - public void cleanup() { - this.function.close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java deleted file mode 100644 index 4c576cf..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.impl; - -import org.apache.eagle.app.environment.AbstractEnvironment; -import com.typesafe.config.Config; - -/** - * Spark Execution Environment Context. - */ -public class SparkEnvironment extends AbstractEnvironment { - public SparkEnvironment(Config config) { - super(config); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java deleted file mode 100644 index 1cad34f..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.impl; - -import org.apache.eagle.app.Application; -import org.apache.eagle.app.environment.ExecutionRuntime; -import org.apache.eagle.app.environment.ExecutionRuntimeProvider; -import com.typesafe.config.Config; -import org.apache.eagle.metadata.model.ApplicationEntity; - -public class SparkExecutionRuntime implements ExecutionRuntime<SparkEnvironment,Object> { - @Override - public void prepare(SparkEnvironment environment) { - throw new RuntimeException("Not implemented yet"); - } - - @Override - public SparkEnvironment environment() { - throw new RuntimeException("Not implemented yet"); - } - - @Override - public void start(Application executor, Config config) { - throw new RuntimeException("Not implemented yet"); - } - - @Override - public void stop(Application executor, Config config) { - throw new RuntimeException("Not implemented yet"); - } - - @Override - public ApplicationEntity.Status status(Application executor, Config config) { - throw new RuntimeException("Not implemented yet"); - } - - public static class Provider implements ExecutionRuntimeProvider<SparkEnvironment,Object> { - @Override - public SparkExecutionRuntime get() { - return new SparkExecutionRuntime(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticApplicationExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticApplicationExecutor.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticApplicationExecutor.java deleted file mode 100644 index b5c7484..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticApplicationExecutor.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.impl; - -import org.apache.eagle.app.StaticApplication; - -/** - * Web Application Container. - */ -public class StaticApplicationExecutor { - private final StaticApplication webApplication; - - public StaticApplicationExecutor(StaticApplication webApplication) { - this.webApplication = webApplication; - } - - public StaticApplication getWebApplication() { - return webApplication; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticEnvironment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticEnvironment.java deleted file mode 100644 index 1c17b1f..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticEnvironment.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.impl; - -import com.typesafe.config.Config; -import org.apache.eagle.app.environment.AbstractEnvironment; - -public class StaticEnvironment extends AbstractEnvironment { - public StaticEnvironment(Config config) { - super(config); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticExecutionRuntime.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticExecutionRuntime.java deleted file mode 100644 index b6e3fed..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticExecutionRuntime.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.app.environment.impl; - -import com.typesafe.config.Config; -import org.apache.eagle.app.Application; -import org.apache.eagle.app.environment.ExecutionRuntime; -import org.apache.eagle.app.environment.ExecutionRuntimeProvider; -import org.apache.eagle.metadata.model.ApplicationEntity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * StaticExecutionRuntime. - */ -public class StaticExecutionRuntime implements ExecutionRuntime<StaticEnvironment,StaticApplicationExecutor> { - private static final Logger LOGGER = LoggerFactory.getLogger(StaticExecutionRuntime.class); - - private StaticEnvironment environment; - - @Override - public void prepare(StaticEnvironment environment) { - this.environment = environment; - } - - @Override - public StaticEnvironment environment() { - return this.environment; - } - - @Override - public void start(Application<StaticEnvironment, StaticApplicationExecutor> executor, Config config) { - LOGGER.warn("Starting {}, do nothing",executor); - } - - @Override - public void stop(Application<StaticEnvironment, StaticApplicationExecutor> executor, Config config) { - LOGGER.warn("Stopping {}, do nothing",executor); - } - - @Override - public ApplicationEntity.Status status(Application<StaticEnvironment, StaticApplicationExecutor> executor, Config config) { - LOGGER.warn("Checking status {}, do nothing",executor); - return ApplicationEntity.Status.INITIALIZED; - } - - public static class Provider implements ExecutionRuntimeProvider<StaticEnvironment,StaticApplicationExecutor> { - @Override - public ExecutionRuntime<StaticEnvironment, StaticApplicationExecutor> get() { - return new StaticExecutionRuntime(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java deleted file mode 100644 index 6827eef..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.impl; - -import org.apache.eagle.app.environment.AbstractEnvironment; -import org.apache.eagle.app.environment.builder.ApplicationBuilder; -import org.apache.eagle.app.environment.builder.MetricDescriptor; -import org.apache.eagle.app.environment.builder.TransformFunction; -import org.apache.eagle.app.environment.builder.TransformFunctionBolt; -import org.apache.eagle.app.messaging.*; -import com.typesafe.config.Config; - -/** - * Storm Execution Environment Context. - */ -public class StormEnvironment extends AbstractEnvironment { - public StormEnvironment(Config envConfig) { - super(envConfig); - } - - // ---------------------------------- - // Classic Storm Topology Builder API - // ---------------------------------- - public StormStreamSink getStreamSink(String streamId, Config config) { - return ((StormStreamSink) stream().getSink(streamId,config)); - } - - public StormStreamSource getStreamSource(String streamId, Config config) { - return (StormStreamSource) stream().getSource(streamId,config); - } - - public MetricStreamPersist getMetricPersist(MetricDescriptor metricDescriptor, Config config) { - return new MetricStreamPersist(metricDescriptor, config); - } - - public EntityStreamPersist getEntityPersist(Config config) { - return new EntityStreamPersist(config); - } - - public MetricSchemaGenerator getMetricSchemaGenerator(MetricDescriptor metricDescriptor, Config config) { - return new MetricSchemaGenerator(metricDescriptor, config); - } - - public TransformFunctionBolt getTransformer(TransformFunction function) { - return new TransformFunctionBolt(function); - } - - // ---------------------------------- - // Fluent Storm App Builder API - // ---------------------------------- - - public ApplicationBuilder newApp(Config appConfig) { - return new ApplicationBuilder(appConfig, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java deleted file mode 100644 index 2b4180d..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.impl; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.generated.*; -import backtype.storm.utils.NimbusClient; -import com.google.common.base.Preconditions; -import com.typesafe.config.ConfigRenderOptions; -import org.apache.eagle.alert.engine.runner.StormMetricTaggedConsumer; -import org.apache.eagle.alert.metric.MetricConfigs; -import org.apache.eagle.app.Application; -import org.apache.eagle.app.environment.ExecutionRuntime; -import org.apache.eagle.app.environment.ExecutionRuntimeProvider; -import org.apache.eagle.app.utils.DynamicJarPathFinder; -import org.apache.eagle.metadata.model.ApplicationEntity; -import org.apache.thrift7.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Int; -import storm.trident.spout.RichSpoutBatchExecutor; - -import java.util.List; -import java.util.Objects; - -public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> { - private static final Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class); - private static LocalCluster _localCluster; - - private StormEnvironment environment; - private KillOptions killOptions; - - private static LocalCluster getLocalCluster() { - if (_localCluster == null) { - _localCluster = new LocalCluster(); - } - return _localCluster; - } - - public StormExecutionRuntime() { - this.killOptions = new KillOptions(); - this.killOptions.set_wait_secs(0); - } - - @Override - public void prepare(StormEnvironment environment) { - this.environment = environment; - } - - @Override - public StormEnvironment environment() { - return this.environment; - } - - public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs"; - - private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost"; - private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost"; - private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627; - private static final String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort"; - - private static final String WORKERS = "workers"; - - private backtype.storm.Config getStormConfig(com.typesafe.config.Config config) { - backtype.storm.Config conf = new backtype.storm.Config(); - conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024)); - conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8)); - conf.put(backtype.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32)); - conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384)); - conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384)); - conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000)); - String nimbusHost = STORM_NIMBUS_HOST_DEFAULT; - if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) { - nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH); - LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost); - } else { - LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT); - } - Integer nimbusThriftPort = STORM_NIMBUS_THRIFT_DEFAULT; - if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) { - nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH); - LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort); - } else { - LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT); - } - conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost); - conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort); - conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "backtype.storm.security.auth.SimpleTransportPlugin"); - if (config.hasPath(WORKERS)) { - conf.setNumWorkers(config.getInt(WORKERS)); - } - - if (config.hasPath(TOPOLOGY_MESSAGE_TIMEOUT_SECS)) { - conf.put(TOPOLOGY_MESSAGE_TIMEOUT_SECS, config.getInt(TOPOLOGY_MESSAGE_TIMEOUT_SECS)); - } - - if (config.hasPath(MetricConfigs.METRIC_SINK_CONF)) { - conf.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()), 1); - } - return conf; - } - - @Override - public void start(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) { - String topologyName = config.getString("appId"); - Preconditions.checkNotNull(topologyName,"[appId] is required by null for " + executor.getClass().getCanonicalName()); - StormTopology topology = executor.execute(config, environment); - LOG.info("Starting {} ({}), mode: {}",topologyName, executor.getClass().getCanonicalName(), config.getString("mode")); - Config conf = getStormConfig(config); - if (ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))) { - String jarFile = config.hasPath("jarPath") ? config.getString("jarPath") : null; - if (jarFile == null) { - jarFile = DynamicJarPathFinder.findPath(executor.getClass()); - } - synchronized (StormExecutionRuntime.class) { - System.setProperty("storm.jar", jarFile); - LOG.info("Submitting as cluster mode ..."); - try { - StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology); - } catch (AlreadyAliveException | InvalidTopologyException e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(),e); - } finally { - System.clearProperty("storm.jar"); - } - } - } else { - LOG.info("Submitting as local mode ..."); - getLocalCluster().submitTopology(topologyName, conf, topology); - LOG.info("Submitted"); - } - LOG.info("Started {} ({})",topologyName,executor.getClass().getCanonicalName()); - } - - @Override - public void stop(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) { - String appId = config.getString("appId"); - LOG.info("Stopping topology {} ...", appId); - if (Objects.equals(config.getString("mode"), ApplicationEntity.Mode.CLUSTER.name())) { - Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig(config)).getClient(); - try { - stormClient.killTopologyWithOpts(appId, this.killOptions); - } catch (NotAliveException | TException e) { - LOG.error("Failed to kill topology named {}, due to: {}",appId,e.getMessage(),e.getCause()); - throw new RuntimeException(e.getMessage(),e); - } - } else { - getLocalCluster().killTopologyWithOpts(appId, this.killOptions); - } - LOG.info("Stopped topology {}", appId); - } - - @Override - public ApplicationEntity.Status status(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) { - String appId = config.getString("appId"); - LOG.info("Fetching {} status", appId); - List<TopologySummary> topologySummaries ; - ApplicationEntity.Status status = null; - try { - if (Objects.equals(config.getString("mode"), ApplicationEntity.Mode.CLUSTER.name())) { - Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig(config)).getClient(); - topologySummaries = stormClient.getClusterInfo().get_topologies(); - } else { - topologySummaries = getLocalCluster().getClusterInfo().get_topologies(); - } - - for (TopologySummary topologySummary : topologySummaries) { - if (topologySummary.get_name().equalsIgnoreCase(appId)) { - if (topologySummary.get_status().equalsIgnoreCase("ACTIVE")) { - status = ApplicationEntity.Status.RUNNING; - } else if (topologySummary.get_status().equalsIgnoreCase("INACTIVE")) { - status = ApplicationEntity.Status.STOPPED; - } else if (topologySummary.get_status().equalsIgnoreCase("KILLED")) { - status = ApplicationEntity.Status.STOPPING; - } else { - LOG.error("Unknown storm topology ({}) status: {}", topologySummary.get_status(),topologySummary.get_status()); - status = ApplicationEntity.Status.UNKNOWN; - } - } - } - //If not exist, return removed - if (status == null) { - status = ApplicationEntity.Status.REMOVED; - } - } catch (TException e) { - LOG.error("Got error to fetch status of {}", appId, e); - status = ApplicationEntity.Status.UNKNOWN; - } - LOG.info("{} status is {}", appId, status); - return status; - } - - public static class Provider implements ExecutionRuntimeProvider<StormEnvironment,StormTopology> { - @Override - public StormExecutionRuntime get() { - return new StormExecutionRuntime(); - } - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java deleted file mode 100644 index 76d6e1b..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java +++ /dev/null @@ -1,320 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.impl; -import java.io.File; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.lang.StringUtils; -import org.json.simple.JSONValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.generated.AlreadyAliveException; -import backtype.storm.generated.ClusterSummary; -import backtype.storm.generated.InvalidTopologyException; -import backtype.storm.generated.Nimbus; -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.SubmitOptions; -import backtype.storm.generated.TopologySummary; -import backtype.storm.utils.BufferFileInputStream; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; -import backtype.storm.Config; - -/** - * copy from storm StormSubmitter - * just rewrite StormSubmitter that does not support summit other jars once submittedJar is set. - * Our implementation will not add this restrict. - * Use this class to submit topologies to run on the Storm cluster. You should run your program - * with the "storm jar" command from the command-line, and then use this class to - * submit your topologies. - */ - -public class StormSubmitter { - public static Logger LOG = LoggerFactory.getLogger(StormSubmitter.class); - - private static final int THRIFT_CHUNK_SIZE_BYTES = 307200; - - private static Nimbus.Iface localNimbus = null; - - public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) { - StormSubmitter.localNimbus = localNimbusHandler; - } - - /** - * Submits a topology to run on the cluster. A topology runs forever or until - * explicitly killed. - * - * - * @param name the name of the storm. - * @param stormConf the topology-specific configuration. See {@link Config}. - * @param topology the processing to execute. - * @throws AlreadyAliveException if a topology with this name is already running - * @throws InvalidTopologyException if an invalid topology was submitted - */ - public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException { - submitTopology(name, stormConf, topology, null, null); - } - - /** - * Submits a topology to run on the cluster. A topology runs forever or until - * explicitly killed. - * - * @param name the name of the storm. - * @param stormConf the topology-specific configuration. See {@link Config}. - * @param topology the processing to execute. - * @param opts to manipulate the starting of the topology. - * @throws AlreadyAliveException if a topology with this name is already running - * @throws InvalidTopologyException if an invalid topology was submitted - */ - public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) - throws AlreadyAliveException, InvalidTopologyException { - submitTopology(name, stormConf, topology, opts, null); - } - - /** - * Submits a topology to run on the cluster. A topology runs forever or until - * explicitly killed. - * - * - * @param name the name of the storm. - * @param stormConf the topology-specific configuration. See {@link Config}. - * @param topology the processing to execute. - * @param opts to manipulate the starting of the topology - * @param progressListener to track the progress of the jar upload process - * @throws AlreadyAliveException if a topology with this name is already running - * @throws InvalidTopologyException if an invalid topology was submitted - */ - public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, - ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException { - if (!Utils.isValidConf(stormConf)) { - throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable"); - } - stormConf = new HashMap(stormConf); - stormConf.putAll(Utils.readCommandLineOpts()); - Map conf = Utils.readStormConfig(); - conf.putAll(stormConf); - try { - String serConf = JSONValue.toJSONString(stormConf); - if (localNimbus != null) { - LOG.info("Submitting topology " + name + " in local mode"); - localNimbus.submitTopology(name, null, serConf, topology); - } else { - NimbusClient client = NimbusClient.getConfiguredClient(conf); - if (topologyNameExists(conf, name)) { - throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); - } - submitJar(conf, progressListener); - try { - LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); - if (opts != null) { - client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); - } else { - // this is for backwards compatibility - client.getClient().submitTopology(name, submittedJar, serConf, topology); - } - } catch (InvalidTopologyException e) { - LOG.warn("Topology submission exception: " + e.get_msg()); - throw e; - } catch (AlreadyAliveException e) { - LOG.warn("Topology already alive exception", e); - throw e; - } finally { - client.close(); - } - } - LOG.info("Finished submitting topology: " + name); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - /** - * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until - * explicitly killed. - * - * - * @param name the name of the storm. - * @param stormConf the topology-specific configuration. See {@link Config}. - * @param topology the processing to execute. - * @throws AlreadyAliveException if a topology with this name is already running - * @throws InvalidTopologyException if an invalid topology was submitted - */ - - public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException { - submitTopologyWithProgressBar(name, stormConf, topology, null); - } - - /** - * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until - * explicitly killed. - * - * - * @param name the name of the storm. - * @param stormConf the topology-specific configuration. See {@link Config}. - * @param topology the processing to execute. - * @param opts to manipulate the starting of the topology - * @throws AlreadyAliveException if a topology with this name is already running - * @throws InvalidTopologyException if an invalid topology was submitted - */ - - public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException { - // show a progress bar so we know we're not stuck (especially on slow connections) - submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() { - @Override - public void onStart(String srcFile, String targetFile, long totalBytes) { - System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes); - } - - @Override - public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) { - int length = 50; - int p = (int)((length * bytesUploaded) / totalBytes); - String progress = StringUtils.repeat("=", p); - String todo = StringUtils.repeat(" ", length - p); - - System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes); - } - - @Override - public void onCompleted(String srcFile, String targetFile, long totalBytes) { - System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes); - } - }); - } - - private static boolean topologyNameExists(Map conf, String name) { - NimbusClient client = NimbusClient.getConfiguredClient(conf); - try { - ClusterSummary summary = client.getClient().getClusterInfo(); - for (TopologySummary s : summary.get_topologies()) { - if (s.get_name().equals(name)) { - return true; - } - } - return false; - - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - client.close(); - } - } - - private static String submittedJar = null; - - private static void submitJar(Map conf, ProgressListener listener) { - LOG.info("before uploaded, submittedJar = {}", submittedJar); - String localJar = System.getProperty("storm.jar"); - submittedJar = submitJar(conf, localJar, listener); - LOG.info("after uploaded, submittedJar = {}", submittedJar); - } - - /** - * Submit jar file - * @param conf the topology-specific configuration. See {@link Config}. - * @param localJar file path of the jar file to submit - * @return the remote location of the submitted jar - */ - public static String submitJar(Map conf, String localJar) { - return submitJar(conf, localJar, null); - } - - /** - * Submit jar file - * @param conf the topology-specific configuration. See {@link Config}. - * @param localJar file path of the jar file to submit - * @param listener progress listener to track the jar file upload - * @return the remote location of the submitted jar - */ - public static String submitJar(Map conf, String localJar, ProgressListener listener) { - if (localJar == null) { - throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload."); - } - - NimbusClient client = NimbusClient.getConfiguredClient(conf); - try { - String uploadLocation = client.getClient().beginFileUpload(); - LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation); - BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES); - - long totalSize = new File(localJar).length(); - if (listener != null) { - listener.onStart(localJar, uploadLocation, totalSize); - } - - long bytesUploaded = 0; - while (true) { - byte[] toSubmit = is.read(); - bytesUploaded += toSubmit.length; - if (listener != null) { - listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize); - } - - if (toSubmit.length == 0) { - break; - } - client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit)); - } - client.getClient().finishFileUpload(uploadLocation); - - if (listener != null) { - listener.onCompleted(localJar, uploadLocation, totalSize); - } - - LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation); - return uploadLocation; - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - client.close(); - } - } - - /** - * Interface use to track progress of file upload. - */ - public interface ProgressListener { - /** - * called before file is uploaded. - * @param srcFile - jar file to be uploaded - * @param targetFile - destination file - * @param totalBytes - total number of bytes of the file - */ - public void onStart(String srcFile, String targetFile, long totalBytes); - - /** - * called whenever a chunk of bytes is uploaded. - * @param srcFile - jar file to be uploaded - * @param targetFile - destination file - * @param bytesUploaded - number of bytes transferred so far - * @param totalBytes - total number of bytes of the file - */ - public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes); - - /** - * called when the file is uploaded. - * @param srcFile - jar file to be uploaded - * @param targetFile - destination file - * @param totalBytes - total number of bytes of the file - */ - public void onCompleted(String srcFile, String targetFile, long totalBytes); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java deleted file mode 100644 index d1cecc9..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import org.apache.eagle.metadata.model.StreamSinkConfig; - -public class DefaultStreamSinkConfig implements StreamSinkConfig { - private final Class<?> streamPersistClass; - private static final String NONE_STORAGE_TYPE = "NONE"; - - public DefaultStreamSinkConfig(Class<?> streamPersistClass) { - this.streamPersistClass = streamPersistClass; - } - - @Override - public String getType() { - return NONE_STORAGE_TYPE; - } - - public Class<?> getSinkType() { - return streamPersistClass; - } - - @Override - public Class<? extends StreamSinkConfig> getConfigType() { - return DefaultStreamSinkConfig.class; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java deleted file mode 100644 index e216dc6..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.app.messaging; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Tuple; -import com.typesafe.config.Config; -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.service.client.IEagleServiceClient; -import org.apache.eagle.service.client.impl.EagleServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; - -public class EntityStreamPersist extends BaseRichBolt { - private static final Logger LOG = LoggerFactory.getLogger(EntityStreamPersist.class); - - private final Config config; - private IEagleServiceClient client; - private OutputCollector collector; - private int batchSize; - private List<TaggedLogAPIEntity> entityBucket = new CopyOnWriteArrayList<>(); - - public EntityStreamPersist(Config config) { - this.config = config; - this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1; - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.client = new EagleServiceClientImpl(config); - this.collector = collector; - } - - @Override - public void execute(Tuple input) { - List<? extends TaggedLogAPIEntity> entities = (List<? extends TaggedLogAPIEntity>) input.getValue(0); - entityBucket.addAll(entities); - - if (entityBucket.size() < batchSize) { - return; - } - - try { - GenericServiceAPIResponseEntity response = client.create(entityBucket); - if (response.isSuccess()) { - LOG.info("persist {} entities with starttime={}", entityBucket.size(), entityBucket.get(0).getTimestamp()); - collector.ack(input); - } else { - LOG.error("Service side error: {}", response.getException()); - collector.reportError(new IllegalStateException(response.getException())); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - collector.fail(input); - } - entityBucket.clear(); - } - - @Override - public void cleanup() { - try { - this.client.getJerseyClient().destroy(); - this.client.close(); - } catch (IOException e) { - LOG.error("Close client error: {}", e.getMessage(), e); - } finally { - super.cleanup(); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java deleted file mode 100644 index c8fe1b5..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import org.apache.eagle.alert.engine.model.StreamEvent; -import backtype.storm.tuple.Tuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.List; - -public class FlattenEventMapper implements StreamEventMapper { - private final String streamId; - private static final String TIMESTAMP_FIELD = "timestamp"; - private static final Logger LOGGER = LoggerFactory.getLogger(FlattenEventMapper.class); - - public FlattenEventMapper(String streamId) { - this.streamId = streamId; - } - - @Override - public List<StreamEvent> map(Tuple tuple) throws Exception { - long timestamp; - if (tuple.getFields().contains(TIMESTAMP_FIELD)) { - try { - timestamp = tuple.getLongByField("timestamp"); - } catch (Exception ex) { - // if timestamp is not null - LOGGER.error(ex.getMessage(), ex); - timestamp = 0; - } - } else { - timestamp = System.currentTimeMillis(); - } - Object[] values = new Object[tuple.getFields().size()]; - for (int i = 0; i < tuple.getFields().size(); i++) { - values[i] = tuple.getValue(i); - } - StreamEvent event = new StreamEvent(); - event.setTimestamp(timestamp); - event.setStreamId(streamId); - event.setData(values); - return Collections.singletonList(event); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java deleted file mode 100644 index 987ed0b..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import backtype.storm.spout.Scheme; -import backtype.storm.tuple.Fields; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -/** - * General Json Schema. - * Different from org.apache.eagle.alert.engine.scheme.JsonScheme which is just to multi-topic cases. - * - * @see org.apache.eagle.alert.engine.scheme.JsonScheme - */ -public class JsonSchema implements Scheme { - private static final long serialVersionUID = -8352896475656975577L; - private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(JsonSchema.class); - private static final ObjectMapper mapper = new ObjectMapper(); - - @Override - public Fields getOutputFields() { - return new Fields("f1","f2"); - } - - @Override - @SuppressWarnings("rawtypes") - public List<Object> deserialize(byte[] ser) { - try { - if (ser != null) { - Map map = mapper.readValue(ser, Map.class); - return Arrays.asList(map.hashCode(), map); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Content is null, ignore"); - } - } - } catch (IOException e) { - try { - LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e); - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - } - } - return null; - } -} \ No newline at end of file
