http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java deleted file mode 100644 index 87a067f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.template; - -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import org.apache.eagle.alert.engine.coordinator.AlertDefinition; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.common.DateTimeUtil; -import org.apache.velocity.Template; -import org.apache.velocity.VelocityContext; -import org.apache.velocity.app.Velocity; -import org.apache.velocity.app.VelocityEngine; -import org.apache.velocity.runtime.RuntimeConstants; -import org.apache.velocity.runtime.resource.loader.StringResourceLoader; -import org.apache.velocity.runtime.resource.util.StringResourceRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -public class VelocityAlertTemplateEngine implements AlertTemplateEngine { - private static final String ALERT_BODY_TPL_PREFIX = "AlertBodyTemplate"; - private static final String ALERT_SUBJECT_TPL_PREFIX = "AlertSubjectTemplate"; - private static final Logger LOG = LoggerFactory.getLogger(VelocityAlertTemplateEngine.class); - private StringResourceRepository stringResourceRepository; - private Map<String, PolicyDefinition> policyDefinitionRepository; - private VelocityEngine engine; - - - @Override - public void init(Config config) { - engine = new VelocityEngine(); - engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, "org.apache.velocity.runtime.log.Log4JLogChute"); - engine.setProperty("runtime.log.logsystem.log4j.logger", LOG.getName()); - engine.setProperty(Velocity.RESOURCE_LOADER, "string"); - engine.addProperty("string.resource.loader.class", StringResourceLoader.class.getName()); - engine.addProperty("string.resource.loader.repository.static", "false"); - engine.init(); - - stringResourceRepository = (StringResourceRepository) engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT); - policyDefinitionRepository = new HashMap<>(); - } - - private String getAlertBodyTemplateName(String policyId) { - return String.format("%s:%s", ALERT_BODY_TPL_PREFIX, policyId); - } - - private String getAlertSubjectTemplateName(String policyId) { - return String.format("%s:%s", ALERT_SUBJECT_TPL_PREFIX, policyId); - } - - @Override - public synchronized void register(PolicyDefinition policyDefinition) { - LOG.info("Registering {}", policyDefinition.getName()); - Preconditions.checkNotNull(policyDefinition.getName(), "policyId is null"); - AlertDefinition alertDefinition = policyDefinition.getAlertDefinition(); - if (alertDefinition == null) { - LOG.warn("Subject template of policy {} is null, using policy name by default"); - stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()), policyDefinition.getName()); - - LOG.warn("Body template of policy {} is null, using $ALERT_EVENT by default"); - String defaultAlertBodyTmpl = String.format("Message: $%s (Auto-generated alert message as template not defined in policy %s)", - AlertContextFields.ALERT_EVENT, policyDefinition.getName()); - stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()), defaultAlertBodyTmpl); - } else if (alertDefinition.getTemplateType().equals(AlertDefinition.TemplateType.TEXT)) { - if (alertDefinition.getSubject() != null) { - stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()), alertDefinition.getSubject()); - } else { - LOG.warn("Subject template of policy {} is null, using policy name by default"); - stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()), policyDefinition.getName()); - } - if (alertDefinition.getBody() != null) { - stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()), alertDefinition.getBody()); - } else { - LOG.warn("Body template of policy {} is null, using ALERT_EVENT by default"); - stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()), "$" + AlertContextFields.ALERT_EVENT); - } - } else { - throw new IllegalArgumentException("Unsupported alert template type " + alertDefinition.getTemplateType()); - } - policyDefinitionRepository.put(policyDefinition.getName(), policyDefinition); - } - - @Override - public synchronized void unregister(String policyId) { - LOG.info("Unregistering {}", policyId); - stringResourceRepository.removeStringResource(getAlertBodyTemplateName(policyId)); - stringResourceRepository.removeStringResource(getAlertSubjectTemplateName(policyId)); - policyDefinitionRepository.remove(policyId); - } - - @Override - public synchronized AlertStreamEvent filter(AlertStreamEvent event) { - Preconditions.checkArgument(this.policyDefinitionRepository.containsKey(event.getPolicyId()), "Unknown policyId " + event.getPolicyId()); - PolicyDefinition policyDefinition = this.policyDefinitionRepository.get(event.getPolicyId()); - StringWriter bodyWriter = new StringWriter(); - StringWriter subjectWriter = new StringWriter(); - try { - VelocityContext alertContext = buildAlertContext(policyDefinition, event); - Template template = engine.getTemplate(getAlertBodyTemplateName(event.getPolicyId())); - template.merge(alertContext, bodyWriter); - event.setBody(bodyWriter.toString()); - - template = engine.getTemplate(getAlertSubjectTemplateName(event.getPolicyId())); - template.merge(alertContext, subjectWriter); - event.setSubject(subjectWriter.toString()); - } finally { - try { - bodyWriter.close(); - } catch (IOException e) { - LOG.warn(e.getMessage(), e); - } - try { - subjectWriter.close(); - } catch (IOException e) { - LOG.warn(e.getMessage(), e); - } - } - return event; - } - - @Override - public synchronized Collection<PolicyDefinition> getPolicies() { - return policyDefinitionRepository.values(); - } - - private static VelocityContext buildAlertContext(PolicyDefinition policyDefinition, AlertStreamEvent event) { - VelocityContext context = new VelocityContext(); - context.put(AlertContextFields.SITE_ID, event.getSiteId()); - context.put(AlertContextFields.STREAM_ID, event.getStreamId()); - context.put(AlertContextFields.ALERT_ID, event.getAlertId()); - context.put(AlertContextFields.CREATED_BY, event.getCreatedBy()); - context.put(AlertContextFields.CREATED_TIMESTAMP, event.getCreatedTime()); - context.put(AlertContextFields.CREATED_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime())); - context.put(AlertContextFields.ALERT_TIMESTAMP, event.getTimestamp()); - context.put(AlertContextFields.ALERT_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getTimestamp())); - context.put(AlertContextFields.ALERT_SCHEMA, event.getSchema()); - context.put(AlertContextFields.ALERT_EVENT, event); - - context.put(AlertContextFields.POLICY_ID, policyDefinition.getName()); - context.put(AlertContextFields.POLICY_DESC, policyDefinition.getDescription()); - context.put(AlertContextFields.POLICY_TYPE, policyDefinition.getDefinition().getType()); - context.put(AlertContextFields.POLICY_DEFINITION, policyDefinition.getDefinition().getValue()); - context.put(AlertContextFields.POLICY_HANDLER, policyDefinition.getDefinition().getHandlerClass()); - - for (Map.Entry<String, Object> entry : event.getDataMap().entrySet()) { - context.put(entry.getKey(), entry.getValue()); - } - return context; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java deleted file mode 100644 index a824a0d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.engine.publisher.template; - -import org.apache.velocity.Template; -import org.apache.velocity.VelocityContext; -import org.apache.velocity.app.Velocity; -import org.apache.velocity.app.VelocityEngine; -import org.apache.velocity.exception.MethodInvocationException; -import org.apache.velocity.exception.ParseErrorException; -import org.apache.velocity.runtime.RuntimeConstants; -import org.apache.velocity.runtime.parser.node.ASTReference; -import org.apache.velocity.runtime.parser.node.ASTprocess; -import org.apache.velocity.runtime.resource.loader.StringResourceLoader; -import org.apache.velocity.runtime.resource.util.StringResourceRepository; -import org.apache.velocity.runtime.visitor.NodeViewMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class VelocityTemplateParser { - private static final Logger LOG = LoggerFactory.getLogger(VelocityTemplateParser.class); - private static final String TEMPLATE_NAME = "template"; - private final Template template; - private final ParserNodeVisitor visitor; - - public VelocityTemplateParser(String templateString) throws ParseErrorException { - VelocityEngine engine = new VelocityEngine(); - engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, "org.apache.velocity.runtime.log.Log4JLogChute"); - engine.setProperty("runtime.log.logsystem.log4j.logger", LOG.getName()); - engine.setProperty(Velocity.RESOURCE_LOADER, "string"); - engine.addProperty("string.resource.loader.class", StringResourceLoader.class.getName()); - engine.addProperty("string.resource.loader.repository.static", "false"); - engine.addProperty("runtime.references.strict", "true"); - engine.init(); - StringResourceRepository resourceRepository = (StringResourceRepository) engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT); - resourceRepository.putStringResource(TEMPLATE_NAME, templateString); - template = engine.getTemplate(TEMPLATE_NAME); - ASTprocess data = (ASTprocess) template.getData(); - visitor = new ParserNodeVisitor(); - data.jjtAccept(visitor, null); - } - - public List<String> getReferenceNames() { - return this.visitor.getReferenceNames(); - } - - public Template getTemplate() { - return template; - } - - /** - * @throws MethodInvocationException if required variable is missing in context. - */ - public void validateContext(Map<String, Object> context) throws MethodInvocationException { - VelocityContext velocityContext = new VelocityContext(); - for (Map.Entry<String, Object> entry : context.entrySet()) { - velocityContext.put(entry.getKey(), entry.getValue()); - } - template.merge(velocityContext, new StringWriter()); - } - - private class ParserNodeVisitor extends NodeViewMode { - private List<String> referenceNames = new ArrayList<>(); - - @Override - public Object visit(ASTReference node, Object data) { - referenceNames.add(node.getRootString()); - return super.visit(node, data); - } - - public List<String> getReferenceNames() { - return this.referenceNames; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java deleted file mode 100644 index e1f3e9c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.router; - -import java.util.Map; - -import org.apache.eagle.alert.coordination.model.AlertBoltSpec; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -/** - * Since 5/2/16. - */ -public interface AlertBoltSpecListener { - void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds); -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java deleted file mode 100644 index 598ce18..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.router; - -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -import java.util.Map; - -/** - * Since 5/3/16. - */ -public interface SpoutSpecListener { - void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds); -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java deleted file mode 100644 index 88ffadb..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.engine.router; - -import org.apache.eagle.alert.engine.model.PartitionedEvent; - -import java.util.List; - - -public interface StreamOutputCollector { - void emit(String streamId, PartitionedEvent partitionedEvent) throws Exception; - - void emit(List<Object> tuple); - - void ack(PartitionedEvent partitionedEvent); - - void fail(PartitionedEvent partitionedEvent); -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java deleted file mode 100644 index 049e852..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.router; - -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.commons.lang3.builder.HashCodeBuilder; - -import java.io.Serializable; - -/** - * <b></b> - * 1. Group by SingleStream[stream_1.col1] - * - * <p>Shuffle(stream_1,[col1])</p> - * - * <b></b> - * 2. Group by SingleStream[stream_1.col1,stream_1.col2] - * - * <p>Shuffle(stream_1,[col1,col2])</p> - * - * <b></b> - * 3. Group by JoinedStream[stream_1.col1,stream_1.col2,stream_2.col3] - * <p>Shuffle(stream_1.col1,stream_1.col2) + Global(stream_2.col3)</p> - */ -public class StreamRoute implements Serializable { - private static final long serialVersionUID = 4649184902196034940L; - - private String targetComponentId; - private int partitionKey; - private String partitionType; - - public String getTargetComponentId() { - return targetComponentId; - } - - public void setTargetComponentId(String targetComponentId) { - this.targetComponentId = targetComponentId; - } - - public StreamRoute(String targetComponentId, int partitionKey, StreamPartition.Type type) { - this.setTargetComponentId(targetComponentId); - this.setPartitionKey(partitionKey); - this.setPartitionType(type); - } - - public int getPartitionKey() { - return partitionKey; - } - - public void setPartitionKey(int partitionKey) { - this.partitionKey = partitionKey; - } - - public StreamPartition.Type getPartitionType() { - return StreamPartition.Type.valueOf(partitionType); - } - - public void setPartitionType(StreamPartition.Type partitionType) { - this.partitionType = partitionType.name(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(partitionKey).append(partitionType).append(targetComponentId).build(); - } - - @Override - public String toString() { - return String.format("Route[target=%s, key=%s, type=%s]", this.targetComponentId, this.partitionKey, this.partitionType); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java deleted file mode 100644 index 0d397e4..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.eagle.alert.engine.router; - -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.router.impl.BasicStreamRoutePartitioner; - -import java.util.List; - -public class StreamRoutePartitionFactory { - /** - * TODO: Decouple different StreamRoutePartitioner implementation from BasicStreamRoutePartitioner. - * - * @param outputComponentIds - * @param streamDefinition - * @param partition - * @return - */ - public static StreamRoutePartitioner createRoutePartitioner(List<String> outputComponentIds, StreamDefinition streamDefinition, StreamPartition partition) { - return new BasicStreamRoutePartitioner(outputComponentIds, streamDefinition, partition); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java deleted file mode 100644 index 5b5632d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.router; - -import org.apache.eagle.alert.engine.model.StreamEvent; - -import java.util.List; - -public interface StreamRoutePartitioner { - List<StreamRoute> partition(StreamEvent event); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java deleted file mode 100644 index dfd2cc4..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.router; - -import org.apache.eagle.alert.coordination.model.StreamRouterSpec; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -import java.util.Collection; -import java.util.Map; - -public interface StreamRouteSpecListener { - void onStreamRouterSpecChange(Collection<StreamRouterSpec> added, - Collection<StreamRouterSpec> removed, - Collection<StreamRouterSpec> modified, - Map<String, StreamDefinition> sds); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java deleted file mode 100644 index a9efc97..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.router; - -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.StreamContext; -import org.apache.eagle.alert.engine.model.PartitionedEvent; - -import java.io.Serializable; - -public interface StreamRouter extends StreamSortSpecListener, Serializable { - void prepare(StreamContext context, PartitionedEventCollector outputCollector); - - void nextEvent(PartitionedEvent event); - - String getName(); - - void close(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java deleted file mode 100644 index 0016fc0..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.router; - -import org.apache.eagle.alert.coordination.model.RouterSpec; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -import java.util.Map; - -/** - * Listen to change on StreamRouterBoltSpec. - * @since 5/1/16. - */ -public interface StreamRouterBoltSpecListener { - void onStreamRouteBoltSpecChange(RouterSpec spec, Map<String, StreamDefinition> sds); -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java deleted file mode 100644 index 613ab7f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.router; - -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener; - -public interface StreamSortHandler extends StreamTimeClockListener { - - void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector); - - /** - * @param event StreamEvent. - */ - void nextEvent(PartitionedEvent event); - - void close(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java deleted file mode 100644 index 087a46f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.router; - -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; - -import java.util.Map; - -public interface StreamSortSpecListener { - void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added, - Map<StreamPartition, StreamSortSpec> removed, - Map<StreamPartition, StreamSortSpec> changed); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java deleted file mode 100644 index a97e1da..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.router.impl; - -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.router.StreamRoute; -import org.apache.eagle.alert.engine.router.StreamRoutePartitioner; -import org.apache.commons.lang3.builder.HashCodeBuilder; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -public class BasicStreamRoutePartitioner implements StreamRoutePartitioner { - private final List<String> outputComponentIds; - private final StreamDefinition streamDefinition; - private final StreamPartition streamPartition; - - public BasicStreamRoutePartitioner(List<String> outputComponentIds, StreamDefinition streamDefinition, StreamPartition partition) { - this.outputComponentIds = outputComponentIds; - this.streamDefinition = streamDefinition; - this.streamPartition = partition; - } - - @Override - public List<StreamRoute> partition(StreamEvent event) { - switch (this.streamPartition.getType()) { - case GLOBAL: - return routeToAll(event); - case GROUPBY: - return routeByGroupByKey(event); - default: - return routeByShuffle(event); - } - } - - protected List<StreamRoute> routeByGroupByKey(StreamEvent event) { - int partitionKey = new HashCodeBuilder().append(event.getData(streamDefinition, this.streamPartition.getColumns())).build(); - String selectedOutputStream = outputComponentIds.get(Math.abs(partitionKey) % this.outputComponentIds.size()); - return Collections.singletonList(new StreamRoute(selectedOutputStream, partitionKey, StreamPartition.Type.GROUPBY)); - } - - protected List<StreamRoute> routeByShuffle(StreamEvent event) { - long random = System.currentTimeMillis(); - int hash = Math.abs((int) random); - return Arrays.asList(new StreamRoute(outputComponentIds.get(hash % outputComponentIds.size()), -1, StreamPartition.Type.SHUFFLE)); - } - - protected List<StreamRoute> routeToAll(StreamEvent event) { - if (globalRoutingKeys != null) { - globalRoutingKeys = new ArrayList<>(); - for (String targetId : outputComponentIds) { - globalRoutingKeys.add(new StreamRoute(targetId, -1, StreamPartition.Type.GLOBAL)); - } - } - return globalRoutingKeys; - } - - private List<StreamRoute> globalRoutingKeys = null; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java deleted file mode 100644 index 5c10675..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.router.impl; - -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.task.WorkerTopologyContext; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.trident.partition.GlobalGrouping; - -import java.util.*; - -public class RoutePhysicalGrouping implements CustomStreamGrouping { - private static final long serialVersionUID = -511915083994148362L; - private static final Logger LOG = LoggerFactory.getLogger(RoutePhysicalGrouping.class); - private List<Integer> outdegreeTasks; - private ShuffleGrouping shuffleGroupingDelegate; - private GlobalGrouping globalGroupingDelegate; - private Map<String, Integer> connectedTargetIds; - - @Override - public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { - this.outdegreeTasks = new ArrayList<>(targetTasks); - shuffleGroupingDelegate = new ShuffleGrouping(); - shuffleGroupingDelegate.prepare(context, stream, targetTasks); - globalGroupingDelegate = new GlobalGrouping(); - globalGroupingDelegate.prepare(context, stream, targetTasks); - connectedTargetIds = new HashMap<>(); - for (Integer targetId : targetTasks) { - String targetComponentId = context.getComponentId(targetId); - connectedTargetIds.put(targetComponentId, targetId); - } - LOG.info("OutDegree components: [{}]", StringUtils.join(connectedTargetIds.values(), ",")); - } - - @Override - public List<Integer> chooseTasks(int taskId, List<Object> values) { - Object routingKeyObj = values.get(0); - if (routingKeyObj != null) { - PartitionedEvent partitionedEvent = (PartitionedEvent) routingKeyObj; - if (partitionedEvent.getPartition().getType() == StreamPartition.Type.GLOBAL) { - return globalGroupingDelegate.chooseTasks(taskId, values); - } else if (partitionedEvent.getPartition().getType() == StreamPartition.Type.GROUPBY) { - return Collections.singletonList(outdegreeTasks.get((int) (partitionedEvent.getPartitionKey() % this.outdegreeTasks.size()))); - } - // Shuffle by defaults - return shuffleGroupingDelegate.chooseTasks(taskId, values); - } - - LOG.warn("Illegal null StreamRoute, throw event"); - return Collections.emptyList(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java deleted file mode 100644 index 752c742..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.router.impl; - -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.task.WorkerTopologyContext; - -import java.io.Serializable; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * NOTE: This is copy from storm 1.0.0 code. DON'T modify it. - * - * @since May 4, 2016 - */ -public class ShuffleGrouping implements CustomStreamGrouping, Serializable { - private static final long serialVersionUID = 5035497345182141765L; - private Random random; - private ArrayList<List<Integer>> choices; - private AtomicInteger current; - - @Override - public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { - random = new Random(); - choices = new ArrayList<List<Integer>>(targetTasks.size()); - for (Integer i : targetTasks) { - choices.add(Arrays.asList(i)); - } - Collections.shuffle(choices, random); - current = new AtomicInteger(0); - } - - @Override - public List<Integer> chooseTasks(int taskId, List<Object> values) { - int rightNow; - int size = choices.size(); - while (true) { - rightNow = current.incrementAndGet(); - if (rightNow < size) { - return choices.get(rightNow); - } else if (rightNow == size) { - current.set(0); - //This should be thread safe so long as ArrayList does not have any internal state that can be messed up by multi-treaded access. - Collections.shuffle(choices, random); - return choices.get(0); - } - //race condition with another thread, and we lost - // try again - } - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java deleted file mode 100644 index 7b8f344..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.engine.router.impl; - -import backtype.storm.task.OutputCollector; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.router.StreamOutputCollector; -import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer; - -import java.util.Collections; -import java.util.List; - -public class StormOutputCollector implements StreamOutputCollector { - - private final OutputCollector outputCollector; - private final PartitionedEventSerializer serializer; - - public StormOutputCollector(OutputCollector outputCollector, PartitionedEventSerializer serializer) { - this.outputCollector = outputCollector; - this.serializer = serializer; - } - - public StormOutputCollector(OutputCollector outputCollector) { - this(outputCollector, null); - } - - @Override - public void emit(String streamId, PartitionedEvent partitionedEvent) throws Exception { - if (this.serializer == null) { - outputCollector.emit(streamId, Collections.singletonList(partitionedEvent.getAnchor()), Collections.singletonList(partitionedEvent)); - } else { - outputCollector.emit(streamId, Collections.singletonList(partitionedEvent.getAnchor()), Collections.singletonList(serializer.serialize(partitionedEvent))); - } - } - - @Override - public void emit(List<Object> tuple) { - outputCollector.emit(tuple); - } - - @Override - public void ack(PartitionedEvent partitionedEvent) { - outputCollector.ack(partitionedEvent.getAnchor()); - } - - @Override - public void fail(PartitionedEvent partitionedEvent) { - this.outputCollector.fail(partitionedEvent.getAnchor()); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java deleted file mode 100644 index 2eb101a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.router.impl; - -import com.google.common.collect.Lists; -import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue; -import org.apache.eagle.alert.coordination.model.StreamRouterSpec; -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.StreamContext; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.router.*; -import org.apache.eagle.alert.utils.StreamIdConversion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -/** - * After sorting, one stream's message will be routed based on its StreamPartition - * One stream may have multiple StreamPartitions based on how this stream is grouped by. - * TODO: Add metric statistics - */ -public class StreamRouterBoltOutputCollector implements PartitionedEventCollector, StreamRouteSpecListener { - private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class); - private final StreamOutputCollector outputCollector; - private final Object outputLock = new Object(); - // private final List<String> outputStreamIds; - private final StreamContext streamContext; - private volatile Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap; - private volatile Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap; - private final String sourceId; - - public StreamRouterBoltOutputCollector(String sourceId, StreamOutputCollector outputCollector, List<String> outputStreamIds, StreamContext streamContext) { - this.sourceId = sourceId; - this.outputCollector = outputCollector; - this.routeSpecMap = new HashMap<>(); - this.routePartitionerMap = new HashMap<>(); - // this.outputStreamIds = outputStreamIds; - this.streamContext = streamContext; - } - - public void emit(PartitionedEvent event) { - try { - this.streamContext.counter().incr("send_count"); - StreamPartition partition = event.getPartition(); - List<StreamRouterSpec> routerSpecs = routeSpecMap.get(partition); - if (routerSpecs == null || routerSpecs.size() <= 0) { - if (LOG.isDebugEnabled()) { - // Don't know how to route stream, if it's correct, it's better to filter useless stream in spout side - LOG.debug("Drop event {} as StreamPartition {} is not pointed to any router metadata {}", event, event.getPartition(), routeSpecMap); - } - this.drop(event); - return; - } - - if (routePartitionerMap.get(partition) == null) { - LOG.error("Partitioner for " + routerSpecs.get(0) + " is null"); - synchronized (outputLock) { - this.streamContext.counter().incr("fail_count"); - this.outputCollector.fail(event); - } - return; - } - - StreamEvent newEvent = event.getEvent().copy(); - - // Get handler for the partition - List<StreamRoutePartitioner> queuePartitioners = routePartitionerMap.get(partition); - - synchronized (outputLock) { - for (StreamRoutePartitioner queuePartitioner : queuePartitioners) { - List<StreamRoute> streamRoutes = queuePartitioner.partition(newEvent); - // it is possible that one event can be sent to multiple slots in one slotqueue if that is All grouping - for (StreamRoute streamRoute : streamRoutes) { - String targetStreamId = StreamIdConversion.generateStreamIdBetween(sourceId, streamRoute.getTargetComponentId()); - try { - PartitionedEvent emittedEvent = new PartitionedEvent(newEvent, partition, streamRoute.getPartitionKey()); - // Route Target Stream id instead of component id - if (LOG.isDebugEnabled()) { - LOG.debug("Emitted to stream {} with message {}", targetStreamId, emittedEvent); - } - outputCollector.emit(targetStreamId, event); - this.streamContext.counter().incr("emit_count"); - } catch (RuntimeException ex) { - this.streamContext.counter().incr("fail_count"); - LOG.error("Failed to emit to {} with {}", targetStreamId, newEvent, ex); - throw ex; - } - } - } - outputCollector.ack(event); - } - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - synchronized (outputLock) { - this.streamContext.counter().incr("fail_count"); - this.outputCollector.fail(event); - } - } - } - - @Override - public void onStreamRouterSpecChange(Collection<StreamRouterSpec> added, - Collection<StreamRouterSpec> removed, - Collection<StreamRouterSpec> modified, - Map<String, StreamDefinition> sds) { - Map<StreamPartition, List<StreamRouterSpec>> copyRouteSpecMap = new HashMap<>(routeSpecMap); - Map<StreamPartition, List<StreamRoutePartitioner>> copyRoutePartitionerMap = new HashMap<>(routePartitionerMap); - - // added StreamRouterSpec i.e. there is a new StreamPartition - for (StreamRouterSpec spec : added) { - if (copyRouteSpecMap.containsKey(spec.getPartition()) - && copyRouteSpecMap.get(spec.getPartition()).contains(spec)) { - LOG.error("Metadata calculation error: add existing StreamRouterSpec " + spec); - } else { - inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds); - } - } - - // removed StreamRouterSpec i.e. there is a deleted StreamPartition - for (StreamRouterSpec spec : removed) { - if (!copyRouteSpecMap.containsKey(spec.getPartition()) - || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) { - LOG.error("Metadata calculation error: remove non-existing StreamRouterSpec " + spec); - } else { - inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec); - } - } - - // modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed - for (StreamRouterSpec spec : modified) { - if (!copyRouteSpecMap.containsKey(spec.getPartition()) - || copyRouteSpecMap.get(spec.getPartition()).contains(spec)) { - LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec); - } else { - inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec); - inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds); - } - } - - // switch - routeSpecMap = copyRouteSpecMap; - routePartitionerMap = copyRoutePartitionerMap; - } - - private void inplaceRemove(Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap, - Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap, - StreamRouterSpec toBeRemoved) { - routeSpecMap.remove(toBeRemoved.getPartition()); - routePartitionerMap.remove(toBeRemoved.getPartition()); - } - - private void inplaceAdd(Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap, - Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap, - StreamRouterSpec toBeAdded, Map<String, StreamDefinition> sds) { - if (!routeSpecMap.containsKey(toBeAdded.getPartition())) { - routeSpecMap.put(toBeAdded.getPartition(), new ArrayList<StreamRouterSpec>()); - } - routeSpecMap.get(toBeAdded.getPartition()).add(toBeAdded); - try { - List<StreamRoutePartitioner> routePartitioners = calculatePartitioner(toBeAdded, sds, routePartitionerMap); - routePartitionerMap.put(toBeAdded.getPartition(), routePartitioners); - } catch (Exception e) { - LOG.error("ignore this failure StreamRouterSpec " + toBeAdded + ", with error" + e.getMessage(), e); - routeSpecMap.remove(toBeAdded.getPartition()); - routePartitionerMap.remove(toBeAdded.getPartition()); - } - } - - private List<StreamRoutePartitioner> calculatePartitioner(StreamRouterSpec streamRouterSpec, - Map<String, StreamDefinition> sds, - Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap) throws Exception { - List<StreamRoutePartitioner> routePartitioners = routePartitionerMap.get(streamRouterSpec.getPartition()); - if (routePartitioners == null) { - routePartitioners = new ArrayList<>(); - } - for (PolicyWorkerQueue pwq : streamRouterSpec.getTargetQueue()) { - routePartitioners.add(StreamRoutePartitionFactory.createRoutePartitioner( - Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId), - sds.get(streamRouterSpec.getPartition().getStreamId()), - streamRouterSpec.getPartition())); - } - return routePartitioners; - } - - @Override - public void drop(PartitionedEvent event) { - synchronized (outputLock) { - this.streamContext.counter().incr("drop_count"); - if (event.getAnchor() != null) { - this.outputCollector.ack(event); - } else { - throw new IllegalStateException(event.toString() + " was not acked as anchor is null"); - } - } - } - - public void flush() { - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java deleted file mode 100644 index 41523cc..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.router.impl; - -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.StreamContext; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.router.StreamRouter; -import org.apache.eagle.alert.engine.router.StreamSortHandler; -import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager; -import org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl; -import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockManagerImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -public class StreamRouterImpl implements StreamRouter { - private static final long serialVersionUID = -4640125063690900014L; - private static final Logger LOG = LoggerFactory.getLogger(StreamRouterImpl.class); - private final String name; - private volatile Map<StreamPartition, StreamSortHandler> streamSortHandlers; - private PartitionedEventCollector outputCollector; - private StreamTimeClockManager streamTimeClockManager; - private StreamContext context; - - /** - * @param name This name should be formed by topologyId + router id, which is built by topology builder. - */ - public StreamRouterImpl(String name) { - this.name = name; - } - - public String getName() { - return this.name; - } - - @Override - public void close() { - streamSortHandlers.values().forEach(StreamSortHandler::close); - streamTimeClockManager.close(); - } - - public void prepare(StreamContext context, PartitionedEventCollector outputCollector) { - this.streamTimeClockManager = new StreamTimeClockManagerImpl(); - this.streamSortHandlers = new HashMap<>(); - this.outputCollector = outputCollector; - this.context = context; - } - - /** - * TODO: Potential improvement: if StreamSortHandler is expensive, we can use DISRUPTOR to buffer. - * - * @param event StreamEvent - */ - public void nextEvent(PartitionedEvent event) { - this.context.counter().incr("receive_count"); - if (!dispatchToSortHandler(event)) { - this.context.counter().incr("direct_count"); - // Pass through directly if no need to sort - outputCollector.emit(event); - } - this.context.counter().incr("sort_count"); - // Update stream clock time if moving forward and trigger all tick listeners - streamTimeClockManager.onTimeUpdate(event.getStreamId(), event.getTimestamp()); - } - - /** - * @param event input event. - * @return whether sorted. - */ - private boolean dispatchToSortHandler(PartitionedEvent event) { - if (event.getTimestamp() <= 0) { - return false; - } - - StreamSortHandler sortHandler = streamSortHandlers.get(event.getPartition()); - if (sortHandler == null) { - if (event.isSortRequired()) { - LOG.warn("Stream sort handler required has not been loaded so emmit directly: {}", event); - this.context.counter().incr("miss_sort_count"); - } - return false; - } else { - sortHandler.nextEvent(event); - return true; - } - } - - @Override - public void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added, - Map<StreamPartition, StreamSortSpec> removed, - Map<StreamPartition, StreamSortSpec> changed) { - synchronized (streamTimeClockManager) { - Map<StreamPartition, StreamSortHandler> copy = new HashMap<>(this.streamSortHandlers); - // add new StreamSortSpec - if (added != null && added.size() > 0) { - for (Entry<StreamPartition, StreamSortSpec> spec : added.entrySet()) { - StreamPartition tmp = spec.getKey(); - if (copy.containsKey(tmp)) { - LOG.error("Metadata calculation error: Duplicated StreamSortSpec " + spec); - } else { - StreamSortHandler handler = new StreamSortWindowHandlerImpl(); - handler.prepare(tmp.getStreamId(), spec.getValue(), this.outputCollector); - copy.put(tmp, handler); - streamTimeClockManager.registerListener(streamTimeClockManager.createStreamTimeClock(tmp.getStreamId()), handler); - } - } - } - - // remove StreamSortSpec - if (removed != null && removed.size() > 0) { - for (Entry<StreamPartition, StreamSortSpec> spec : removed.entrySet()) { - StreamPartition tmp = spec.getKey(); - if (copy.containsKey(tmp)) { - copy.get(tmp).close(); - streamTimeClockManager.removeListener(copy.get(tmp)); - copy.remove(tmp); - } else { - LOG.error("Metadata calculation error: remove nonexisting StreamSortSpec " + spec.getValue()); - } - } - } - - // modify StreamSortSpec - if (changed != null && changed.size() > 0) { - for (Entry<StreamPartition, StreamSortSpec> spec : changed.entrySet()) { - StreamPartition tmp = spec.getKey(); - if (copy.containsKey(tmp)) { - copy.get(tmp).close(); - streamTimeClockManager.removeListener(copy.get(tmp)); - copy.remove(tmp); - StreamSortHandler handler = new StreamSortWindowHandlerImpl(); - handler.prepare(tmp.getStreamId(), spec.getValue(), this.outputCollector); - copy.put(tmp, handler); - streamTimeClockManager.registerListener(tmp.getStreamId(), handler); - } else { - LOG.error("Metadata calculation error: modify non-existing StreamSortSpec " + spec.getValue()); - } - } - } - - // atomic switch - this.streamSortHandlers = copy; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java deleted file mode 100644 index ab05b48..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.runner; - -import org.apache.eagle.alert.engine.StreamContext; -import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer; -import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; -import org.apache.eagle.alert.engine.serialization.Serializers; -import org.apache.eagle.alert.utils.AlertConstants; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@SuppressWarnings( {"rawtypes", "serial"}) -public abstract class AbstractStreamBolt extends BaseRichBolt implements SerializationMetadataProvider { - private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamBolt.class); - private IMetadataChangeNotifyService changeNotifyService; - - public Config getConfig() { - return config; - } - - private Config config; - private List<String> outputStreamIds; - protected OutputCollector collector; - protected Map stormConf; - - private String boltId; - protected PartitionedEventSerializer serializer; - protected volatile Map<String, StreamDefinition> sdf = new HashMap<String, StreamDefinition>(); - protected volatile String specVersion = "Not Initialized"; - protected volatile boolean specVersionOutofdate = false; - protected StreamContext streamContext; - - public AbstractStreamBolt(String boltId, IMetadataChangeNotifyService changeNotifyService, Config config) { - this.boltId = boltId; - this.changeNotifyService = changeNotifyService; - this.config = config; - } - - public void declareOutputStreams(List<String> outputStreamIds) { - this.outputStreamIds = outputStreamIds; - } - - protected List<String> getOutputStreamIds() { - return this.outputStreamIds; - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - Preconditions.checkNotNull(this.changeNotifyService, "IMetadataChangeNotifyService is not set yet"); - this.stormConf = stormConf; - this.collector = collector; - this.serializer = Serializers.newPartitionedEventSerializer(this); - internalPrepare(collector, this.changeNotifyService, this.config, context); - try { - this.changeNotifyService.activateFetchMetaData(); - } catch (Exception e) { - LOG.warn(e.getMessage(), e); - } - } - - - protected PartitionedEvent deserialize(Object object) throws IOException { - // byte[] in higher priority - if (object instanceof byte[]) { - return serializer.deserialize((byte[]) object); - } else if (object instanceof PartitionedEvent) { - return (PartitionedEvent) object; - } else { - throw new IllegalStateException(String.format("Unsupported event class '%s', expect byte array or PartitionedEvent!", object == null ? null : object.getClass().getCanonicalName())); - } - } - - /** - * subclass should implement more initialization for example. - * 1) register metadata change - * 2) init stream context - * - * @param collector - * @param metadataManager - * @param config - * @param context - */ - public abstract void internalPrepare( - OutputCollector collector, - IMetadataChangeNotifyService metadataManager, - Config config, TopologyContext context); - - @Override - public void cleanup() { - super.cleanup(); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (this.outputStreamIds != null) { - LOG.info("declare streams: {} ", outputStreamIds); - for (String streamId : this.outputStreamIds) { - declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0)); - } - } else { - declarer.declare(new Fields(AlertConstants.FIELD_0)); - } - } - - @Override - public StreamDefinition getStreamDefinition(String streamId) throws StreamNotDefinedException { - if (sdf.containsKey(streamId)) { - return sdf.get(streamId); - } else { - throw new StreamNotDefinedException(streamId, specVersion); - } - } - - public String getBoltId() { - return boltId; - } -} \ No newline at end of file
